[S2GRAPH-57]: Change package names into org.apahce.s2graph. refactor package names on all projects.
JIRA: [S2GRAPH-57] https://issues.apache.org/jira/browse/S2GRAPH-57 Pull Request: Closes #40 Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b6fe32fc Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b6fe32fc Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b6fe32fc Branch: refs/heads/master Commit: b6fe32fc2c3e15cea544d1af6b7698862e26d0ab Parents: e71264d Author: DO YUNG YOON <[email protected]> Authored: Tue Mar 15 18:26:19 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Mar 15 18:27:34 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + dev_support/docker-compose.yml | 3 - .../loader/spark/BulkLoadPartitioner.scala | 56 + .../loader/spark/FamilyHFileWriteOptions.scala | 35 + .../s2graph/loader/spark/HBaseContext.scala | 849 ++++++++++++ .../loader/spark/HBaseDStreamFunctions.scala | 158 +++ .../loader/spark/HBaseRDDFunctions.scala | 207 +++ .../s2graph/loader/spark/JavaHBaseContext.scala | 342 +++++ .../loader/spark/KeyFamilyQualifier.scala | 46 + .../loader/subscriber/GraphSubscriber.scala | 335 +++++ .../loader/subscriber/TransferToHFile.scala | 194 +++ .../s2graph/loader/subscriber/WalLogStat.scala | 90 ++ .../loader/subscriber/WalLogToHDFS.scala | 148 +++ .../main/scala/spark/BulkLoadPartitioner.scala | 56 - .../ColumnFamilyQualifierMapKeyWrapper.scala | 73 -- loader/src/main/scala/spark/DefaultSource.scala | 982 -------------- .../scala/spark/FamilyHFileWriteOptions.scala | 35 - loader/src/main/scala/spark/HBaseContext.scala | 849 ------------ .../scala/spark/HBaseDStreamFunctions.scala | 158 --- .../main/scala/spark/HBaseRDDFunctions.scala | 207 --- .../src/main/scala/spark/JavaHBaseContext.scala | 342 ----- .../main/scala/spark/KeyFamilyQualifier.scala | 46 - .../main/scala/subscriber/GraphSubscriber.scala | 339 ----- .../subscriber/GraphSubscriberStreaming.scala | 103 -- .../src/main/scala/subscriber/KafkaToHdfs.scala | 111 -- .../main/scala/subscriber/TestEdgeBuilder.scala | 66 - .../main/scala/subscriber/TransferToHFile.scala | 195 --- .../scala/subscriber/VertexDegreeBuilder.scala | 102 -- .../src/main/scala/subscriber/WalLogStat.scala | 94 -- .../main/scala/subscriber/WalLogToHDFS.scala | 149 --- .../loader/subscriber/GraphSubscriberTest.scala | 45 + .../loader/subscriber/TransferToHFileTest.scala | 169 +++ .../scala/subscriber/GraphSubscriberTest.scala | 45 - .../scala/subscriber/TransferToHFileTest.scala | 169 --- .../scala/com/kakao/s2graph/core/Edge.scala | 568 -------- .../kakao/s2graph/core/ExceptionHandler.scala | 162 --- .../scala/com/kakao/s2graph/core/Graph.scala | 381 ------ .../com/kakao/s2graph/core/GraphElement.scala | 12 - .../kakao/s2graph/core/GraphExceptions.scala | 29 - .../com/kakao/s2graph/core/GraphUtil.scala | 138 -- .../com/kakao/s2graph/core/JSONParser.scala | 134 -- .../com/kakao/s2graph/core/Management.scala | 368 ------ .../com/kakao/s2graph/core/OrderingUtil.scala | 146 --- .../com/kakao/s2graph/core/PostProcess.scala | 554 -------- .../com/kakao/s2graph/core/QueryParam.scala | 583 --------- .../com/kakao/s2graph/core/QueryResult.scala | 42 - .../scala/com/kakao/s2graph/core/Vertex.scala | 107 -- .../com/kakao/s2graph/core/mysqls/Bucket.scala | 66 - .../kakao/s2graph/core/mysqls/ColumnMeta.scala | 110 -- .../kakao/s2graph/core/mysqls/Experiment.scala | 78 -- .../com/kakao/s2graph/core/mysqls/Label.scala | 372 ------ .../kakao/s2graph/core/mysqls/LabelIndex.scala | 143 -- .../kakao/s2graph/core/mysqls/LabelMeta.scala | 151 --- .../com/kakao/s2graph/core/mysqls/Model.scala | 100 -- .../com/kakao/s2graph/core/mysqls/Service.scala | 96 -- .../s2graph/core/mysqls/ServiceColumn.scala | 82 -- .../s2graph/core/parsers/WhereParser.scala | 207 --- .../kakao/s2graph/core/rest/RequestParser.scala | 609 --------- .../kakao/s2graph/core/rest/RestHandler.scala | 245 ---- .../s2graph/core/storage/Deserializable.scala | 25 - .../kakao/s2graph/core/storage/SKeyValue.scala | 47 - .../s2graph/core/storage/Serializable.scala | 10 - .../kakao/s2graph/core/storage/Storage.scala | 1224 ------------------ .../core/storage/StorageDeserializable.scala | 95 -- .../core/storage/StorageSerializable.scala | 41 - .../core/storage/hbase/AsynchbaseStorage.scala | 533 -------- .../tall/IndexEdgeDeserializable.scala | 132 -- .../indexedge/tall/IndexEdgeSerializable.scala | 53 - .../wide/IndexEdgeDeserializable.scala | 116 -- .../indexedge/wide/IndexEdgeSerializable.scala | 53 - .../tall/SnapshotEdgeDeserializable.scala | 84 -- .../tall/SnapshotEdgeSerializable.scala | 47 - .../wide/SnapshotEdgeDeserializable.scala | 70 - .../wide/SnapshotEdgeSerializable.scala | 50 - .../serde/vertex/VertexDeserializable.scala | 46 - .../serde/vertex/VertexSerializable.scala | 20 - .../kakao/s2graph/core/types/HBaseType.scala | 163 --- .../kakao/s2graph/core/types/InnerValLike.scala | 246 ---- .../s2graph/core/types/LabelWithDirection.scala | 65 - .../com/kakao/s2graph/core/types/VertexId.scala | 145 --- .../kakao/s2graph/core/types/v1/InnerVal.scala | 226 ---- .../kakao/s2graph/core/types/v2/InnerVal.scala | 162 --- .../kakao/s2graph/core/utils/DeferCache.scala | 82 -- .../kakao/s2graph/core/utils/Extentions.scala | 72 -- .../kakao/s2graph/core/utils/FutureCache.scala | 82 -- .../com/kakao/s2graph/core/utils/Logger.scala | 44 - .../s2graph/core/utils/SafeUpdateCache.scala | 62 - .../scala/org/apache/s2graph/core/Edge.scala | 567 ++++++++ .../apache/s2graph/core/ExceptionHandler.scala | 159 +++ .../scala/org/apache/s2graph/core/Graph.scala | 379 ++++++ .../org/apache/s2graph/core/GraphElement.scala | 10 + .../apache/s2graph/core/GraphExceptions.scala | 28 + .../org/apache/s2graph/core/GraphUtil.scala | 139 ++ .../org/apache/s2graph/core/JSONParser.scala | 134 ++ .../org/apache/s2graph/core/Management.scala | 368 ++++++ .../org/apache/s2graph/core/OrderingUtil.scala | 146 +++ .../org/apache/s2graph/core/PostProcess.scala | 553 ++++++++ .../org/apache/s2graph/core/QueryParam.scala | 583 +++++++++ .../org/apache/s2graph/core/QueryResult.scala | 42 + .../scala/org/apache/s2graph/core/Vertex.scala | 101 ++ .../org/apache/s2graph/core/mysqls/Bucket.scala | 66 + .../apache/s2graph/core/mysqls/ColumnMeta.scala | 110 ++ .../apache/s2graph/core/mysqls/Experiment.scala | 77 ++ .../org/apache/s2graph/core/mysqls/Label.scala | 372 ++++++ .../apache/s2graph/core/mysqls/LabelIndex.scala | 143 ++ .../apache/s2graph/core/mysqls/LabelMeta.scala | 150 +++ .../org/apache/s2graph/core/mysqls/Model.scala | 100 ++ .../apache/s2graph/core/mysqls/Service.scala | 94 ++ .../s2graph/core/mysqls/ServiceColumn.scala | 82 ++ .../s2graph/core/parsers/WhereParser.scala | 207 +++ .../s2graph/core/rest/RequestParser.scala | 610 +++++++++ .../apache/s2graph/core/rest/RestHandler.scala | 244 ++++ .../s2graph/core/storage/Deserializable.scala | 24 + .../apache/s2graph/core/storage/SKeyValue.scala | 47 + .../s2graph/core/storage/Serializable.scala | 8 + .../apache/s2graph/core/storage/Storage.scala | 1224 ++++++++++++++++++ .../core/storage/StorageDeserializable.scala | 95 ++ .../core/storage/StorageSerializable.scala | 41 + .../core/storage/hbase/AsynchbaseStorage.scala | 534 ++++++++ .../tall/IndexEdgeDeserializable.scala | 132 ++ .../indexedge/tall/IndexEdgeSerializable.scala | 53 + .../wide/IndexEdgeDeserializable.scala | 116 ++ .../indexedge/wide/IndexEdgeSerializable.scala | 52 + .../tall/SnapshotEdgeDeserializable.scala | 84 ++ .../tall/SnapshotEdgeSerializable.scala | 47 + .../wide/SnapshotEdgeDeserializable.scala | 70 + .../wide/SnapshotEdgeSerializable.scala | 51 + .../serde/vertex/VertexDeserializable.scala | 47 + .../serde/vertex/VertexSerializable.scala | 19 + .../apache/s2graph/core/types/HBaseType.scala | 160 +++ .../s2graph/core/types/InnerValLike.scala | 243 ++++ .../s2graph/core/types/LabelWithDirection.scala | 61 + .../apache/s2graph/core/types/VertexId.scala | 142 ++ .../apache/s2graph/core/types/v1/InnerVal.scala | 224 ++++ .../apache/s2graph/core/types/v2/InnerVal.scala | 160 +++ .../apache/s2graph/core/utils/DeferCache.scala | 82 ++ .../apache/s2graph/core/utils/Extentions.scala | 73 ++ .../apache/s2graph/core/utils/FutureCache.scala | 82 ++ .../org/apache/s2graph/core/utils/Logger.scala | 44 + .../s2graph/core/utils/SafeUpdateCache.scala | 62 + .../scala/com/kakao/s2graph/core/EdgeTest.scala | 536 -------- .../kakao/s2graph/core/Integrate/CrudTest.scala | 226 ---- .../core/Integrate/IntegrateCommon.scala | 309 ----- .../s2graph/core/Integrate/QueryTest.scala | 909 ------------- .../core/Integrate/StrongLabelDeleteTest.scala | 283 ---- .../core/Integrate/VertexTestHelper.scala | 71 - .../core/Integrate/WeakLabelDeleteTest.scala | 136 -- .../com/kakao/s2graph/core/JsonParserTest.scala | 66 - .../kakao/s2graph/core/OrderingUtilTest.scala | 130 -- .../com/kakao/s2graph/core/QueryParamTest.scala | 86 -- .../com/kakao/s2graph/core/TestCommon.scala | 192 --- .../s2graph/core/TestCommonWithModels.scala | 197 --- .../com/kakao/s2graph/core/VertexTest.scala | 79 -- .../kakao/s2graph/core/models/ModelTest.scala | 135 -- .../s2graph/core/mysqls/ExperimentSpec.scala | 64 - .../s2graph/core/parsers/WhereParserTest.scala | 234 ---- .../hbase/AsynchbaseQueryBuilderTest.scala | 53 - .../storage/hbase/AsynchbaseStorageTest.scala | 36 - .../core/storage/hbase/IndexEdgeTest.scala | 81 -- .../s2graph/core/types/CompositeIdTest.scala | 109 -- .../kakao/s2graph/core/types/EdgeTypeTest.scala | 69 - .../kakao/s2graph/core/types/InnerValTest.scala | 130 -- .../s2graph/core/types/SourceVertexIdTest.scala | 52 - .../s2graph/core/types/TargetVertexIdTest.scala | 53 - .../kakao/s2graph/core/types/VertexIdTest.scala | 52 - .../s2graph/core/types/VertexTypeTest.scala | 42 - .../org/apache/s2graph/core/EdgeTest.scala | 535 ++++++++ .../s2graph/core/Integrate/CrudTest.scala | 226 ++++ .../core/Integrate/IntegrateCommon.scala | 309 +++++ .../s2graph/core/Integrate/QueryTest.scala | 906 +++++++++++++ .../core/Integrate/StrongLabelDeleteTest.scala | 283 ++++ .../core/Integrate/VertexTestHelper.scala | 71 + .../core/Integrate/WeakLabelDeleteTest.scala | 136 ++ .../apache/s2graph/core/JsonParserTest.scala | 66 + .../apache/s2graph/core/OrderingUtilTest.scala | 130 ++ .../apache/s2graph/core/QueryParamTest.scala | 86 ++ .../org/apache/s2graph/core/TestCommon.scala | 188 +++ .../s2graph/core/TestCommonWithModels.scala | 194 +++ .../apache/s2graph/core/models/ModelTest.scala | 130 ++ .../s2graph/core/mysqls/ExperimentSpec.scala | 64 + .../s2graph/core/parsers/WhereParserTest.scala | 234 ++++ .../storage/hbase/AsynchbaseStorageTest.scala | 33 + .../core/storage/hbase/IndexEdgeTest.scala | 81 ++ .../s2graph/core/types/InnerValTest.scala | 128 ++ .../org/apache/s2graph/counter/TrxLog.scala | 25 + .../counter/config/ConfigFunctions.scala | 30 + .../counter/config/S2CounterConfig.scala | 44 + .../apache/s2graph/counter/core/BytesUtil.scala | 13 + .../s2graph/counter/core/ExactCounter.scala | 243 ++++ .../apache/s2graph/counter/core/ExactKey.scala | 26 + .../s2graph/counter/core/ExactQualifier.scala | 73 ++ .../s2graph/counter/core/ExactStorage.scala | 32 + .../s2graph/counter/core/RankingCounter.scala | 101 ++ .../s2graph/counter/core/RankingKey.scala | 4 + .../s2graph/counter/core/RankingResult.scala | 3 + .../s2graph/counter/core/RankingStorage.scala | 17 + .../s2graph/counter/core/RankingValue.scala | 15 + .../s2graph/counter/core/RateRankingValue.scala | 15 + .../s2graph/counter/core/TimedQualifier.scala | 215 +++ .../s2graph/counter/core/v1/BytesUtilV1.scala | 60 + .../core/v1/ExactStorageAsyncHBase.scala | 317 +++++ .../counter/core/v1/ExactStorageHBase.scala | 326 +++++ .../counter/core/v1/RankingStorageRedis.scala | 185 +++ .../s2graph/counter/core/v2/BytesUtilV2.scala | 88 ++ .../counter/core/v2/ExactStorageGraph.scala | 343 +++++ .../counter/core/v2/GraphOperation.scala | 48 + .../counter/core/v2/RankingStorageGraph.scala | 395 ++++++ .../s2graph/counter/decay/DecayFormula.scala | 6 + .../s2graph/counter/decay/ExpDecayFormula.scala | 25 + .../s2graph/counter/helper/CounterAdmin.scala | 113 ++ .../counter/helper/DistributedScanner.scala | 70 + .../counter/helper/HashShardingJedis.scala | 153 +++ .../s2graph/counter/helper/Management.scala | 143 ++ .../s2graph/counter/helper/WithHBase.scala | 98 ++ .../s2graph/counter/helper/WithRedis.scala | 59 + .../s2graph/counter/models/CachedDBModel.scala | 11 + .../apache/s2graph/counter/models/Counter.scala | 210 +++ .../apache/s2graph/counter/models/DBModel.scala | 24 + .../org/apache/s2graph/counter/package.scala | 8 + .../s2graph/counter/util/CartesianProduct.scala | 8 + .../s2graph/counter/util/CollectionCache.scala | 66 + .../s2graph/counter/util/FunctionParser.scala | 21 + .../apache/s2graph/counter/util/Hashes.scala | 21 + .../s2graph/counter/util/ReduceMapValue.scala | 9 + .../org/apache/s2graph/counter/util/Retry.scala | 44 + .../s2graph/counter/util/SplitBytes.scala | 21 + .../s2graph/counter/util/UnitConverter.scala | 25 + .../main/scala/s2/config/ConfigFunctions.scala | 33 - .../main/scala/s2/config/S2CounterConfig.scala | 47 - .../src/main/scala/s2/counter/TrxLog.scala | 28 - .../main/scala/s2/counter/core/BytesUtil.scala | 15 - .../scala/s2/counter/core/ExactCounter.scala | 247 ---- .../main/scala/s2/counter/core/ExactKey.scala | 29 - .../scala/s2/counter/core/ExactQualifier.scala | 78 -- .../scala/s2/counter/core/ExactStorage.scala | 35 - .../scala/s2/counter/core/RankingCounter.scala | 105 -- .../main/scala/s2/counter/core/RankingKey.scala | 6 - .../scala/s2/counter/core/RankingResult.scala | 6 - .../scala/s2/counter/core/RankingStorage.scala | 19 - .../scala/s2/counter/core/RankingValue.scala | 19 - .../s2/counter/core/RateRankingValue.scala | 18 - .../scala/s2/counter/core/TimedQualifier.scala | 218 ---- .../scala/s2/counter/core/v1/BytesUtilV1.scala | 64 - .../core/v1/ExactStorageAsyncHBase.scala | 321 ----- .../s2/counter/core/v1/ExactStorageHBase.scala | 327 ----- .../counter/core/v1/RankingStorageRedis.scala | 189 --- .../scala/s2/counter/core/v2/BytesUtilV2.scala | 91 -- .../s2/counter/core/v2/ExactStorageGraph.scala | 346 ----- .../s2/counter/core/v2/GraphOperation.scala | 52 - .../counter/core/v2/RankingStorageGraph.scala | 399 ------ .../scala/s2/counter/decay/DecayFormula.scala | 9 - .../s2/counter/decay/ExpDecayFormula.scala | 28 - .../src/main/scala/s2/counter/package.scala | 11 - .../src/main/scala/s2/helper/CounterAdmin.scala | 114 -- .../scala/s2/helper/DistributedScanner.scala | 74 -- .../scala/s2/helper/HashShardingJedis.scala | 156 --- .../src/main/scala/s2/helper/Management.scala | 146 --- .../src/main/scala/s2/helper/WithHBase.scala | 101 -- .../src/main/scala/s2/helper/WithRedis.scala | 62 - .../main/scala/s2/models/CachedDBModel.scala | 14 - .../src/main/scala/s2/models/Counter.scala | 213 --- .../src/main/scala/s2/models/DBModel.scala | 27 - .../main/scala/s2/util/CartesianProduct.scala | 11 - .../main/scala/s2/util/CollectionCache.scala | 69 - .../src/main/scala/s2/util/FunctionParser.scala | 24 - .../src/main/scala/s2/util/Hashes.scala | 24 - .../src/main/scala/s2/util/ReduceMapValue.scala | 12 - .../src/main/scala/s2/util/Retry.scala | 47 - .../src/main/scala/s2/util/SplitBytes.scala | 24 - .../src/main/scala/s2/util/UnitConverter.scala | 28 - .../counter/core/RankingCounterSpec.scala | 166 +++ .../counter/models/CounterModelSpec.scala | 50 + .../s2graph/counter/models/CounterSpec.scala | 33 + .../s2/counter/core/RankingCounterSpec.scala | 169 --- .../test/scala/s2/models/CounterModelSpec.scala | 53 - .../src/test/scala/s2/models/CounterSpec.scala | 36 - .../counter/loader/CounterBulkLoader.scala | 78 ++ .../counter/loader/EraseDailyCounter.scala | 133 ++ .../counter/loader/config/StreamingConfig.scala | 24 + .../loader/core/CounterEtlFunctions.scala | 86 ++ .../counter/loader/core/CounterEtlItem.scala | 39 + .../counter/loader/core/CounterFunctions.scala | 446 +++++++ .../counter/loader/core/DimensionProps.scala | 150 +++ .../loader/models/DefaultCounterModel.scala | 6 + .../counter/loader/stream/EtlStreaming.scala | 114 ++ .../loader/stream/ExactCounterStreaming.scala | 69 + .../loader/stream/GraphToETLStreaming.scala | 80 ++ .../loader/stream/RankingCounterStreaming.scala | 72 ++ .../main/scala/s2/config/StreamingConfig.scala | 24 - .../scala/s2/counter/CounterBulkLoader.scala | 78 -- .../scala/s2/counter/EraseDailyCounter.scala | 133 -- .../s2/counter/core/CounterEtlFunctions.scala | 89 -- .../scala/s2/counter/core/CounterEtlItem.scala | 43 - .../s2/counter/core/CounterFunctions.scala | 446 ------- .../scala/s2/counter/core/DimensionProps.scala | 154 --- .../scala/s2/counter/stream/EtlStreaming.scala | 116 -- .../counter/stream/ExactCounterStreaming.scala | 72 -- .../s2/counter/stream/GraphToETLStreaming.scala | 83 -- .../stream/RankingCounterStreaming.scala | 74 -- .../scala/s2/models/DefaultCounterModel.scala | 8 - .../loader/core/CounterEtlFunctionsSpec.scala | 33 + .../loader/core/DimensionPropsTest.scala | 46 + .../stream/ExactCounterStreamingSpec.scala | 198 +++ .../stream/RankingCounterStreamingSpec.scala | 451 +++++++ .../counter/core/CounterEtlFunctionsSpec.scala | 33 - .../s2/counter/core/DimensionPropsTest.scala | 46 - .../stream/ExactCounterStreamingSpec.scala | 196 --- .../stream/RankingCounterStreamingSpec.scala | 448 ------- .../src/main/resources/application.conf | 0 s2rest_netty/src/main/scala/Server.scala | 202 --- .../org/apache/s2graph/rest/netty/Server.scala | 200 +++ s2rest_play/app/Bootstrap.scala | 81 -- s2rest_play/app/actors/QueueActor.scala | 92 -- s2rest_play/app/config/Config.scala | 41 - s2rest_play/app/config/CounterConfig.scala | 10 - .../app/controllers/AdminController.scala | 424 ------ .../app/controllers/ApplicationController.scala | 101 -- .../app/controllers/CounterController.scala | 747 ----------- .../app/controllers/EdgeController.scala | 219 ---- .../app/controllers/ExperimentController.scala | 24 - .../app/controllers/JsonBodyParser.scala | 86 -- .../app/controllers/PublishController.scala | 54 - .../app/controllers/QueryController.scala | 52 - .../app/controllers/TestController.scala | 24 - .../app/controllers/VertexController.scala | 86 -- s2rest_play/app/models/ExactCounterItem.scala | 38 - s2rest_play/app/models/RankCounterItem.scala | 40 - s2rest_play/app/models/package.scala | 8 - .../apache/s2graph/rest/play/Bootstrap.scala | 81 ++ .../s2graph/rest/play/actors/QueueActor.scala | 89 ++ .../s2graph/rest/play/config/Config.scala | 41 + .../rest/play/config/CounterConfig.scala | 10 + .../rest/play/controllers/AdminController.scala | 424 ++++++ .../controllers/ApplicationController.scala | 101 ++ .../play/controllers/CounterController.scala | 744 +++++++++++ .../rest/play/controllers/EdgeController.scala | 219 ++++ .../play/controllers/ExperimentController.scala | 23 + .../rest/play/controllers/JsonBodyParser.scala | 86 ++ .../play/controllers/PublishController.scala | 54 + .../rest/play/controllers/QueryController.scala | 52 + .../play/controllers/VertexController.scala | 85 ++ .../rest/play/models/ExactCounterItem.scala | 38 + .../rest/play/models/RankCounterItem.scala | 40 + .../s2graph/rest/play/models/package.scala | 10 + s2rest_play/app/util/TestDataLoader.scala | 70 - s2rest_play/conf/reference.conf | 2 +- s2rest_play/conf/routes | 165 ++- .../test/benchmark/BenchmarkCommon.scala | 24 - s2rest_play/test/benchmark/GraphUtilSpec.scala | 125 -- .../test/benchmark/JsonBenchmarkSpec.scala | 46 - .../benchmark/OrderingUtilBenchmarkSpec.scala | 100 -- .../test/benchmark/SamplingBenchmarkSpec.scala | 85 -- .../test/controllers/PostProcessSpec.scala | 112 -- .../rest/play/benchmark/BenchmarkCommon.scala | 24 + .../rest/play/benchmark/GraphUtilSpec.scala | 124 ++ .../rest/play/benchmark/JsonBenchmarkSpec.scala | 45 + .../benchmark/OrderingUtilBenchmarkSpec.scala | 98 ++ .../play/benchmark/SamplingBenchmarkSpec.scala | 87 ++ .../rest/play/controllers/PostProcessSpec.scala | 112 ++ .../s2graph/spark/config/S2ConfigFactory.scala | 20 + .../s2graph/spark/spark/HashMapParam.scala | 55 + .../apache/s2graph/spark/spark/RDDUtil.scala | 13 + .../apache/s2graph/spark/spark/SparkApp.scala | 120 ++ .../spark/spark/SubscriberListener.scala | 20 + .../apache/s2graph/spark/spark/WithKafka.scala | 69 + .../streaming/kafka/KafkaRDDFunctions.scala | 3 - .../spark/streaming/kafka/StreamHelper.scala | 4 - .../main/scala/s2/config/S2ConfigFactory.scala | 27 - .../src/main/scala/s2/spark/HashMapParam.scala | 55 - spark/src/main/scala/s2/spark/RDDUtil.scala | 16 - spark/src/main/scala/s2/spark/SparkApp.scala | 124 -- .../scala/s2/spark/SubscriberListener.scala | 24 - spark/src/main/scala/s2/spark/WithKafka.scala | 69 - .../org/apache/s2graph/spark/SparkAppTest.scala | 17 + .../s2graph/spark/TestStreamingSpec.scala | 31 + .../src/test/scala/s2/spark/SparkAppTest.scala | 21 - .../test/scala/s2/spark/TestStreamingSpec.scala | 34 - 377 files changed, 25150 insertions(+), 27480 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 4c3eadb..8b5fc63 100644 --- a/CHANGES +++ b/CHANGES @@ -69,6 +69,8 @@ Release 0.12.1 - unreleased S2GRAPH-5: Add Apache RAT to valid LICENSE errors. (Committed by DOYUNG YOON). S2GRAPH-17: Remove unnecessary abstraction layer, Storage. (Committed by DOYUNG YOON). + + S2GRAPH-57: Change package names into org.apahce.s2graph. (Committed by DOYUNG YOON). SUB TASKS http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/dev_support/docker-compose.yml ---------------------------------------------------------------------- diff --git a/dev_support/docker-compose.yml b/dev_support/docker-compose.yml index 9818251..ba9d489 100644 --- a/dev_support/docker-compose.yml +++ b/dev_support/docker-compose.yml @@ -2,9 +2,6 @@ graph: image: s2rest_play:0.12.1-SNAPSHOT container_name: graph net: container:graph_hbase - links: - - graph_mysql - - graph_hbase graph_mysql: build: graph_mysql http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/org/apache/s2graph/loader/spark/BulkLoadPartitioner.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/BulkLoadPartitioner.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/BulkLoadPartitioner.scala new file mode 100644 index 0000000..cf73736 --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/BulkLoadPartitioner.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.loader.spark + +import java.util +import java.util.Comparator + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.Partitioner + +/** + * A Partitioner implementation that will separate records to different + * HBase Regions based on region splits + * + * @param startKeys The start keys for the given table + */ +class BulkLoadPartitioner(startKeys:Array[Array[Byte]]) + extends Partitioner { + + override def numPartitions: Int = startKeys.length + + override def getPartition(key: Any): Int = { + + val rowKey:Array[Byte] = + key match { + case qualifier: KeyFamilyQualifier => + qualifier.rowKey + case _ => + key.asInstanceOf[Array[Byte]] + } + + val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] { + override def compare(o1: Array[Byte], o2: Array[Byte]): Int = { + Bytes.compareTo(o1, o2) + } + } + val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator) + if (partition < 0) partition * -1 + -2 + else partition + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/org/apache/s2graph/loader/spark/FamilyHFileWriteOptions.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/FamilyHFileWriteOptions.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/FamilyHFileWriteOptions.scala new file mode 100644 index 0000000..a9886a2 --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/FamilyHFileWriteOptions.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.loader.spark + +import java.io.Serializable + +/** + * This object will hold optional data for how a given column family's + * writer will work + * + * @param compression String to define the Compression to be used in the HFile + * @param bloomType String to define the bloom type to be used in the HFile + * @param blockSize The block size to be used in the HFile + * @param dataBlockEncoding String to define the data block encoding to be used + * in the HFile + */ +class FamilyHFileWriteOptions( val compression:String, + val bloomType: String, + val blockSize: Int, + val dataBlockEncoding: String) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala new file mode 100644 index 0000000..94e12af --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala @@ -0,0 +1,849 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.loader.spark + +import java.net.InetSocketAddress +import java.util + +import org.apache.hadoop.hbase.fs.HFileSystem +import org.apache.hadoop.hbase._ +import org.apache.hadoop.hbase.io.compress.Compression +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding +import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder} +import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, BloomType} +import org.apache.hadoop.hbase.util.Bytes +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD +import org.apache.hadoop.conf.Configuration +import HBaseRDDFunctions._ +import org.apache.hadoop.hbase.client._ +import scala.reflect.ClassTag +import org.apache.spark.{Logging, SerializableWritable, SparkContext} +import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil, +TableInputFormat, IdentityTableMapper} +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.mapreduce.Job +import org.apache.spark.streaming.dstream.DStream +import java.io._ +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod +import org.apache.hadoop.fs.{Path, FileSystem} +import scala.collection.mutable + +/** + * HBaseContext is a façade for HBase operations + * like bulk put, get, increment, delete, and scan + * + * HBaseContext will take the responsibilities + * of disseminating the configuration information + * to the working and managing the life cycle of HConnections. + */ +class HBaseContext(@transient sc: SparkContext, + @transient config: Configuration, + val tmpHdfsConfgFile: String = null) + extends Serializable with Logging { + + @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials() + @transient var tmpHdfsConfiguration:Configuration = config + @transient var appliedCredentials = false + @transient val job = Job.getInstance(config) + TableMapReduceUtil.initCredentials(job) + val broadcastedConf = sc.broadcast(new SerializableWritable(config)) + val credentialsConf = sc.broadcast(new SerializableWritable(job.getCredentials)) + + LatestHBaseContextCache.latest = this + + if (tmpHdfsConfgFile != null && config != null) { + val fs = FileSystem.newInstance(config) + val tmpPath = new Path(tmpHdfsConfgFile) + if (!fs.exists(tmpPath)) { + val outputStream = fs.create(tmpPath) + config.write(outputStream) + outputStream.close() + } else { + logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!") + } + } + + /** + * A simple enrichment of the traditional Spark RDD foreachPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param rdd Original RDD with data to iterate over + * @param f Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + */ + def foreachPartition[T](rdd: RDD[T], + f: (Iterator[T], Connection) => Unit):Unit = { + rdd.foreachPartition( + it => hbaseForeachPartition(broadcastedConf, it, f)) + } + + /** + * A simple enrichment of the traditional Spark Streaming dStream foreach + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + */ + def foreachPartition[T](dstream: DStream[T], + f: (Iterator[T], Connection) => Unit):Unit = { + dstream.foreachRDD((rdd, time) => { + foreachPartition(rdd, f) + }) + } + + /** + * A simple enrichment of the traditional Spark RDD mapPartition. + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * @param rdd Original RDD with data to iterate over + * @param mp Function to be given a iterator to iterate through + * the RDD values and a HConnection object to interact + * with HBase + * @return Returns a new RDD generated by the user definition + * function just like normal mapPartition + */ + def mapPartitions[T, R: ClassTag](rdd: RDD[T], + mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = { + + rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf, + it, + mp)) + + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * foreachPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamForeachPartition[T](dstream: DStream[T], + f: (Iterator[T], Connection) => Unit): Unit = { + + dstream.foreachRDD(rdd => this.foreachPartition(rdd, f)) + } + + /** + * A simple enrichment of the traditional Spark Streaming DStream + * mapPartition. + * + * This function differs from the original in that it offers the + * developer access to a already connected HConnection object + * + * Note: Do not close the HConnection object. All HConnection + * management is handled outside this method + * + * Note: Make sure to partition correctly to avoid memory issue when + * getting data from HBase + * + * @param dstream Original DStream with data to iterate over + * @param f Function to be given a iterator to iterate through + * the DStream values and a HConnection object to + * interact with HBase + * @return Returns a new DStream generated by the user + * definition function just like normal mapPartition + */ + def streamMapPartitions[T, U: ClassTag](dstream: DStream[T], + f: (Iterator[T], Connection) => Iterator[U]): + DStream[U] = { + dstream.mapPartitions(it => hbaseMapPartition[T, U]( + broadcastedConf, + it, + f)) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take RDD + * and generate puts and send them to HBase. + * The complexity of managing the HConnection is + * removed from the developer + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in the RDD to a HBase Put + */ + def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) { + + val tName = tableName.getName + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, connection) => { + val m = connection.getBufferedMutator(TableName.valueOf(tName)) + iterator.foreach(T => m.mutate(f(T))) + m.flush() + m.close() + })) + } + + def applyCreds[T] (configBroadcast: Broadcast[SerializableWritable[Configuration]]){ + credentials = SparkHadoopUtil.get.getCurrentUserCredentials() + + logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + credentials) + + if (!appliedCredentials && credentials != null) { + appliedCredentials = true + + @transient val ugi = UserGroupInformation.getCurrentUser + ugi.addCredentials(credentials) + // specify that this is a proxy user + ugi.setAuthenticationMethod(AuthenticationMethod.PROXY) + + ugi.addCredentials(credentialsConf.value.value) + } + } + + /** + * A simple abstraction over the HBaseContext.streamMapPartition method. + * + * It allow addition support for a user to take a DStream and + * generate puts and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to put into + * @param f Function to convert a value in + * the DStream to a HBase Put + */ + def streamBulkPut[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Put) = { + val tName = tableName.getName + dstream.foreachRDD((rdd, time) => { + bulkPut(rdd, TableName.valueOf(tName), f) + }) + } + + /** + * A simple abstraction over the HBaseContext.foreachPartition method. + * + * It allow addition support for a user to take a RDD and generate delete + * and send them to HBase. The complexity of managing the HConnection is + * removed from the developer + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to delete from + * @param f Function to convert a value in the RDD to a + * HBase Deletes + * @param batchSize The number of delete to batch before sending to HBase + */ + def bulkDelete[T](rdd: RDD[T], tableName: TableName, + f: (T) => Delete, batchSize: Integer) { + bulkMutation(rdd, tableName, f, batchSize) + } + + /** + * A simple abstraction over the HBaseContext.streamBulkMutation method. + * + * It allow addition support for a user to take a DStream and + * generate Delete and send them to HBase. + * + * The complexity of managing the HConnection is + * removed from the developer + * + * @param dstream Original DStream with data to iterate over + * @param tableName The name of the table to delete from + * @param f function to convert a value in the DStream to a + * HBase Delete + * @param batchSize The number of deletes to batch before sending to HBase + */ + def streamBulkDelete[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Delete, + batchSize: Integer) = { + streamBulkMutation(dstream, tableName, f, batchSize) + } + + /** + * Under lining function to support all bulk mutations + * + * May be opened up if requested + */ + private def bulkMutation[T](rdd: RDD[T], tableName: TableName, + f: (T) => Mutation, batchSize: Integer) { + + val tName = tableName.getName + rdd.foreachPartition( + it => hbaseForeachPartition[T]( + broadcastedConf, + it, + (iterator, connection) => { + val table = connection.getTable(TableName.valueOf(tName)) + val mutationList = new java.util.ArrayList[Mutation] + iterator.foreach(T => { + mutationList.add(f(T)) + if (mutationList.size >= batchSize) { + table.batch(mutationList, null) + mutationList.clear() + } + }) + if (mutationList.size() > 0) { + table.batch(mutationList, null) + mutationList.clear() + } + table.close() + })) + } + + /** + * Under lining function to support all bulk streaming mutations + * + * May be opened up if requested + */ + private def streamBulkMutation[T](dstream: DStream[T], + tableName: TableName, + f: (T) => Mutation, + batchSize: Integer) = { + val tName = tableName.getName + dstream.foreachRDD((rdd, time) => { + bulkMutation(rdd, TableName.valueOf(tName), f, batchSize) + }) + } + + /** + * A simple abstraction over the HBaseContext.mapPartition method. + * + * It allow addition support for a user to take a RDD and generates a + * new RDD based on Gets and the results they bring back from HBase + * + * @param rdd Original RDD with data to iterate over + * @param tableName The name of the table to get from + * @param makeGet function to convert a value in the RDD to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * RDD + * return new RDD that is created by the Get to HBase + */ + def bulkGet[T, U: ClassTag](tableName: TableName, + batchSize: Integer, + rdd: RDD[T], + makeGet: (T) => Get, + convertResult: (Result) => U): RDD[U] = { + + val getMapPartition = new GetMapPartition(tableName, + batchSize, + makeGet, + convertResult) + + rdd.mapPartitions[U](it => + hbaseMapPartition[T, U]( + broadcastedConf, + it, + getMapPartition.run)) + } + + /** + * A simple abstraction over the HBaseContext.streamMap method. + * + * It allow addition support for a user to take a DStream and + * generates a new DStream based on Gets and the results + * they bring back from HBase + * + * @param tableName The name of the table to get from + * @param batchSize The number of Gets to be sent in a single batch + * @param dStream Original DStream with data to iterate over + * @param makeGet Function to convert a value in the DStream to a + * HBase Get + * @param convertResult This will convert the HBase Result object to + * what ever the user wants to put in the resulting + * DStream + * @return A new DStream that is created by the Get to HBase + */ + def streamBulkGet[T, U: ClassTag](tableName: TableName, + batchSize: Integer, + dStream: DStream[T], + makeGet: (T) => Get, + convertResult: (Result) => U): DStream[U] = { + + val getMapPartition = new GetMapPartition(tableName, + batchSize, + makeGet, + convertResult) + + dStream.mapPartitions[U](it => hbaseMapPartition[T, U]( + broadcastedConf, + it, + getMapPartition.run)) + } + + /** + * This function will use the native HBase TableInputFormat with the + * given scan object to generate a new RDD + * + * @param tableName the name of the table to scan + * @param scan the HBase scan object to use to read data from HBase + * @param f function to convert a Result object from HBase into + * what the user wants in the final generated RDD + * @return new RDD with results from scan + */ + def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan, + f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = { + + val job: Job = Job.getInstance(getConf(broadcastedConf)) + + TableMapReduceUtil.initCredentials(job) + TableMapReduceUtil.initTableMapperJob(tableName, scan, + classOf[IdentityTableMapper], null, null, job) + + sc.newAPIHadoopRDD(job.getConfiguration, + classOf[TableInputFormat], + classOf[ImmutableBytesWritable], + classOf[Result]).map(f) + } + + /** + * A overloaded version of HBaseContext hbaseRDD that defines the + * type of the resulting RDD + * + * @param tableName the name of the table to scan + * @param scans the HBase scan object to use to read data from HBase + * @return New RDD with results from scan + * + */ + def hbaseRDD(tableName: TableName, scans: Scan): + RDD[(ImmutableBytesWritable, Result)] = { + + hbaseRDD[(ImmutableBytesWritable, Result)]( + tableName, + scans, + (r: (ImmutableBytesWritable, Result)) => r) + } + + /** + * underlining wrapper all foreach functions in HBaseContext + */ + private def hbaseForeachPartition[T](configBroadcast: + Broadcast[SerializableWritable[Configuration]], + it: Iterator[T], + f: (Iterator[T], Connection) => Unit) = { + + val config = getConf(configBroadcast) + + applyCreds(configBroadcast) + // specify that this is a proxy user + val connection = ConnectionFactory.createConnection(config) + f(it, connection) + connection.close() + } + + private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): + Configuration = { + + if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) { + val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf) + val inputStream = fs.open(new Path(tmpHdfsConfgFile)) + tmpHdfsConfiguration = new Configuration(false) + tmpHdfsConfiguration.readFields(inputStream) + inputStream.close() + } + + if (tmpHdfsConfiguration == null) { + try { + tmpHdfsConfiguration = configBroadcast.value.value + } catch { + case ex: Exception => logError("Unable to getConfig from broadcast", ex) + } + } + tmpHdfsConfiguration + } + + /** + * underlining wrapper all mapPartition functions in HBaseContext + * + */ + private def hbaseMapPartition[K, U]( + configBroadcast: + Broadcast[SerializableWritable[Configuration]], + it: Iterator[K], + mp: (Iterator[K], Connection) => + Iterator[U]): Iterator[U] = { + + val config = getConf(configBroadcast) + applyCreds(configBroadcast) + + val connection = ConnectionFactory.createConnection(config) + val res = mp(it, connection) + connection.close() + res + + } + + /** + * underlining wrapper all get mapPartition functions in HBaseContext + */ + private class GetMapPartition[T, U](tableName: TableName, + batchSize: Integer, + makeGet: (T) => Get, + convertResult: (Result) => U) + extends Serializable { + + val tName = tableName.getName + + def run(iterator: Iterator[T], connection: Connection): Iterator[U] = { + val table = connection.getTable(TableName.valueOf(tName)) + + val gets = new java.util.ArrayList[Get]() + var res = List[U]() + + while (iterator.hasNext) { + gets.add(makeGet(iterator.next())) + + if (gets.size() == batchSize) { + val results = table.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + } + if (gets.size() > 0) { + val results = table.get(gets) + res = res ++ results.map(convertResult) + gets.clear() + } + table.close() + res.iterator + } + } + + /** + * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef]. + * + * This method is used to keep ClassTags out of the external Java API, as + * the Java compiler cannot produce them automatically. While this + * ClassTag-faking does please the compiler, it can cause problems at runtime + * if the Scala API relies on ClassTags for correctness. + * + * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior, + * just worse performance or security issues. + * For instance, an Array of AnyRef can hold any type T, but may lose primitive + * specialization. + */ + private[spark] + def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] + + /** + * A Spark Implementation of HBase Bulk load + * + * This will take the content from an existing RDD then sort and shuffle + * it with respect to region splits. The result of that sort and shuffle + * will be written to HFiles. + * + * After this function is executed the user will have to call + * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase + * + * Also note this version of bulk load is different from past versions in + * that it includes the qualifier as part of the sort process. The + * reason for this is to be able to support rows will very large number + * of columns. + * + * @param rdd The RDD we are bulk loading from + * @param tableName The HBase table we are loading into + * @param flatMap A flapMap function that will make every + * row in the RDD + * into N cells for the bulk load + * @param stagingDir The location on the FileSystem to bulk load into + * @param familyHFileWriteOptionsMap Options that will define how the HFile for a + * column family is written + * @param compactionExclude Compaction excluded for the HFiles + * @param maxSize Max size for the HFiles before they roll + * @tparam T The Type of values in the original RDD + */ + def bulkLoad[T](rdd:RDD[T], + tableName: TableName, + flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], + stagingDir:String, + familyHFileWriteOptionsMap: + util.Map[Array[Byte], FamilyHFileWriteOptions] = + new util.HashMap[Array[Byte], FamilyHFileWriteOptions], + compactionExclude: Boolean = false, + maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE): + Unit = { + val conn = ConnectionFactory.createConnection(config) + val regionLocator = conn.getRegionLocator(tableName) + val startKeys = regionLocator.getStartKeys + val defaultCompressionStr = config.get("hfile.compression", + Compression.Algorithm.NONE.getName) + val defaultCompression = Compression.getCompressionAlgorithmByName(defaultCompressionStr) +// HFileWriterImpl +// .compressionByName(defaultCompressionStr) + val now = System.currentTimeMillis() + val tableNameByteArray = tableName.getName + + val familyHFileWriteOptionsMapInternal = + new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions] + + val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator() + + while (entrySetIt.hasNext) { + val entry = entrySetIt.next() + familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue) + } + + /** + * This will return a new HFile writer when requested + * + * @param family column family + * @param conf configuration to connect to HBase + * @param favoredNodes nodes that we would like to write too + * @param fs FileSystem object where we will be writing the HFiles to + * @return WriterLength object + */ + def getNewWriter(family: Array[Byte], conf: Configuration, + favoredNodes: Array[InetSocketAddress], + fs:FileSystem, + familydir:Path): WriterLength = { + + + var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family)) + + if (familyOptions == null) { + familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString, + BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString) + familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions) + } + + val tempConf = new Configuration(conf) + tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f) + val contextBuilder = new HFileContextBuilder() + .withCompression(Algorithm.valueOf(familyOptions.compression)) + .withChecksumType(HStore.getChecksumType(conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)) + .withBlockSize(familyOptions.blockSize) + contextBuilder.withDataBlockEncoding(DataBlockEncoding. + valueOf(familyOptions.dataBlockEncoding)) + val hFileContext = contextBuilder.build() + + if (null == favoredNodes) { + new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs) + .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build()) +// .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build()) + } else { + new WriterLength(0, + new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs)) + .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType)) + .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext) +// .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext) + .withFavoredNodes(favoredNodes).build()) + } + } + + val regionSplitPartitioner = + new BulkLoadPartitioner(startKeys) + + //This is where all the magic happens + //Here we are going to do the following things + // 1. FlapMap every row in the RDD into key column value tuples + // 2. Then we are going to repartition sort and shuffle + // 3. Finally we are going to write out our HFiles + rdd.flatMap( r => flatMap(r)). + repartitionAndSortWithinPartitions(regionSplitPartitioner). + hbaseForeachPartition(this, (it, conn) => { + + val conf = broadcastedConf.value.value + val fs = FileSystem.get(conf) + val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength] + var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY + var rollOverRequested = false + + /** + * This will roll all writers + */ + def rollWriters(): Unit = { + writerMap.values.foreach( wl => { + if (wl.writer != null) { + logDebug("Writer=" + wl.writer.getPath + + (if (wl.written == 0) "" else ", wrote=" + wl.written)) + close(wl.writer) + } + }) + writerMap.clear() + rollOverRequested = false + } + + /** + * This function will close a given HFile writer + * @param w The writer to close + */ + def close(w:StoreFile.Writer): Unit = { + if (w != null) { + w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())) + w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow))) + w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)) + w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)) + w.appendTrackedTimestampsToMetadata() + w.close() + } + } + + //Here is where we finally iterate through the data in this partition of the + //RDD that has been sorted and partitioned + it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) => + + //This will get a writer for the column family + //If there is no writer for a given column family then + //it will get created here. + val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), { + + val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family)) + + fs.mkdirs(familyDir) + + val loc:HRegionLocation = { + try { + val locator = + conn.getRegionLocator(TableName.valueOf(tableNameByteArray)) + locator.getRegionLocation(keyFamilyQualifier.rowKey) + } catch { + case e: Throwable => + logWarning("there's something wrong when locating rowkey: " + + Bytes.toString(keyFamilyQualifier.rowKey)) + null + } + } + if (null == loc) { + if (log.isTraceEnabled) { + logTrace("failed to get region location, so use default writer: " + + Bytes.toString(keyFamilyQualifier.rowKey)) + } + getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null, + fs = fs, familydir = familyDir) + } else { + if (log.isDebugEnabled) { + logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]") + } + val initialIsa = + new InetSocketAddress(loc.getHostname, loc.getPort) + if (initialIsa.isUnresolved) { + if (log.isTraceEnabled) { + logTrace("failed to resolve bind address: " + loc.getHostname + ":" + + loc.getPort + ", so use default writer") + } + getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir) + } else { + if(log.isDebugEnabled) { + logDebug("use favored nodes writer: " + initialIsa.getHostString) + } + getNewWriter(keyFamilyQualifier.family, conf, + Array[InetSocketAddress](initialIsa), fs, familyDir) + } + } + }) + + val keyValue =new KeyValue(keyFamilyQualifier.rowKey, + keyFamilyQualifier.family, + keyFamilyQualifier.qualifier, + now,cellValue) + + wl.writer.append(keyValue) + wl.written += keyValue.getLength + + rollOverRequested = rollOverRequested || wl.written > maxSize + + //This will only roll if we have at least one column family file that is + //bigger then maxSize and we have finished a given row key + if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) { + rollWriters() + } + + previousRow = keyFamilyQualifier.rowKey + } + //We have finished all the data so lets close up the writers + rollWriters() + }) + } + + /** + * This is a wrapper class around StoreFile.Writer. The reason for the + * wrapper is to keep the length of the file along side the writer + * + * @param written The writer to be wrapped + * @param writer The number of bytes written to the writer + */ + class WriterLength(var written:Long, val writer:StoreFile.Writer) + + /** + * This is a wrapper over a byte array so it can work as + * a key in a hashMap + * + * @param o1 The Byte Array value + */ + class ByteArrayWrapper (val o1:Array[Byte]) + extends Comparable[ByteArrayWrapper] with Serializable { + override def compareTo(o2: ByteArrayWrapper): Int = { + Bytes.compareTo(o1,o2.o1) + } + override def equals(o2: Any): Boolean = { + o2 match { + case wrapper: ByteArrayWrapper => + Bytes.equals(o1, wrapper.o1) + case _ => + false + } + } + override def hashCode():Int = { + Bytes.hashCode(o1) + } + } +} + +object LatestHBaseContextCache { + var latest:HBaseContext = null +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseDStreamFunctions.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseDStreamFunctions.scala new file mode 100644 index 0000000..80f289b --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseDStreamFunctions.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.s2graph.loader.spark + +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.spark.streaming.dstream.DStream + +import scala.reflect.ClassTag + +/** + * HBaseDStreamFunctions contains a set of implicit functions that can be + * applied to a Spark DStream so that we can easily interact with HBase + */ +object HBaseDStreamFunctions { + + /** + * These are implicit methods for a DStream that contains any type of + * data. + * + * @param dStream This is for dStreams of any type + * @tparam T Type T + */ + implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) { + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * put. This will not return a new Stream. Think of it like a foreach + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param f The function that will turn the DStream values + * into HBase Put objects. + */ + def hbaseBulkPut(hc: HBaseContext, + tableName: TableName, + f: (T) => Put): Unit = { + hc.streamBulkPut(dStream, tableName, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new DStream. Think about it as a DStream map + * function. In that every DStream value will get a new value out of + * HBase. That new value will populate the newly generated DStream. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @param convertResult The function that will convert a HBase + * Result object into a value that will go + * into the resulting DStream + * @tparam R The type of Object that will be coming + * out of the resulting DStream + * @return A resulting DStream with type R objects + */ + def hbaseBulkGet[R: ClassTag](hc: HBaseContext, + tableName: TableName, + batchSize:Int, f: (T) => Get, convertResult: (Result) => R): + DStream[R] = { + hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new DStream. Think about it as a DStream map + * function. In that every DStream value will get a new value out of + * HBase. That new value will populate the newly generated DStream. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @return A resulting DStream with type R objects + */ + def hbaseBulkGet(hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] = { + hc.streamBulkGet[T, (ImmutableBytesWritable, Result)]( + tableName, batchSize, dStream, f, + result => (new ImmutableBytesWritable(result.getRow), result)) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * Delete. This will not return a new DStream. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param tableName The tableName that the deletes will be sent to + * @param f The function that will convert the DStream value into + * a HBase Delete Object + * @param batchSize The number of Deletes to be sent in a single batch + */ + def hbaseBulkDelete(hc: HBaseContext, + tableName: TableName, + f:(T) => Delete, batchSize:Int): Unit = { + hc.streamBulkDelete(dStream, tableName, f, batchSize) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * foreachPartition method. This will ack very much like a normal DStream + * foreach method but for the fact that you will now have a HBase connection + * while iterating through the values. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * DStream along with a connection object to HBase + */ + def hbaseForeachPartition(hc: HBaseContext, + f: (Iterator[T], Connection) => Unit): Unit = { + hc.streamForeachPartition(dStream, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * mapPartitions method. This will ask very much like a normal DStream + * map partitions method but for the fact that you will now have a + * HBase connection while iterating through the values + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * DStream along with a connection object to HBase + * @tparam R This is the type of objects that will go into the resulting + * DStream + * @return A resulting DStream of type R + */ + def hbaseMapPartitions[R: ClassTag](hc: HBaseContext, + f: (Iterator[T], Connection) => Iterator[R]): + DStream[R] = { + hc.streamMapPartitions(dStream, f) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala new file mode 100644 index 0000000..b818a3c --- /dev/null +++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.s2graph.loader.spark + +import java.util + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hbase.{HConstants, TableName} +import org.apache.hadoop.hbase.client._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.spark.rdd.RDD + +import scala.collection.immutable.HashMap +import scala.reflect.ClassTag + +/** + * HBaseRDDFunctions contains a set of implicit functions that can be + * applied to a Spark RDD so that we can easily interact with HBase + */ +object HBaseRDDFunctions +{ + + /** + * These are implicit methods for a RDD that contains any type of + * data. + * + * @param rdd This is for rdd of any type + * @tparam T This is any type + */ + implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) { + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * put. This will not return a new RDD. Think of it like a foreach + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param f The function that will turn the RDD values + * into HBase Put objects. + */ + def hbaseBulkPut(hc: HBaseContext, + tableName: TableName, + f: (T) => Put): Unit = { + hc.bulkPut(rdd, tableName, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new RDD. Think about it as a RDD map + * function. In that every RDD value will get a new value out of + * HBase. That new value will populate the newly generated RDD. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @param convertResult The function that will convert a HBase + * Result object into a value that will go + * into the resulting RDD + * @tparam R The type of Object that will be coming + * out of the resulting RDD + * @return A resulting RDD with type R objects + */ + def hbaseBulkGet[R: ClassTag](hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get, convertResult: (Result) => R): RDD[R] = { + hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * get. This will return a new RDD. Think about it as a RDD map + * function. In that every RDD value will get a new value out of + * HBase. That new value will populate the newly generated RDD. + * + * @param hc The hbaseContext object to identify which + * HBase cluster connection to use + * @param tableName The tableName that the put will be sent to + * @param batchSize How many gets to execute in a single batch + * @param f The function that will turn the RDD values + * in HBase Get objects + * @return A resulting RDD with type R objects + */ + def hbaseBulkGet(hc: HBaseContext, + tableName: TableName, batchSize:Int, + f: (T) => Get): RDD[(ImmutableBytesWritable, Result)] = { + hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName, + batchSize, rdd, f, + result => if (result != null && result.getRow != null) { + (new ImmutableBytesWritable(result.getRow), result) + } else { + null + }) + } + + /** + * Implicit method that gives easy access to HBaseContext's bulk + * Delete. This will not return a new RDD. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param tableName The tableName that the deletes will be sent to + * @param f The function that will convert the RDD value into + * a HBase Delete Object + * @param batchSize The number of Deletes to be sent in a single batch + */ + def hbaseBulkDelete(hc: HBaseContext, + tableName: TableName, f:(T) => Delete, batchSize:Int): Unit = { + hc.bulkDelete(rdd, tableName, f, batchSize) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * foreachPartition method. This will ack very much like a normal RDD + * foreach method but for the fact that you will now have a HBase connection + * while iterating through the values. + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * RDD along with a connection object to HBase + */ + def hbaseForeachPartition(hc: HBaseContext, + f: (Iterator[T], Connection) => Unit): Unit = { + hc.foreachPartition(rdd, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * mapPartitions method. This will ask very much like a normal RDD + * map partitions method but for the fact that you will now have a + * HBase connection while iterating through the values + * + * @param hc The hbaseContext object to identify which HBase + * cluster connection to use + * @param f This function will get an iterator for a Partition of an + * RDD along with a connection object to HBase + * @tparam R This is the type of objects that will go into the resulting + * RDD + * @return A resulting RDD of type R + */ + def hbaseMapPartitions[R: ClassTag](hc: HBaseContext, + f: (Iterator[T], Connection) => Iterator[R]): + RDD[R] = { + hc.mapPartitions[T,R](rdd, f) + } + + /** + * Implicit method that gives easy access to HBaseContext's + * bulkLoad method. + * + * A Spark Implementation of HBase Bulk load + * + * This will take the content from an existing RDD then sort and shuffle + * it with respect to region splits. The result of that sort and shuffle + * will be written to HFiles. + * + * After this function is executed the user will have to call + * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase + * + * Also note this version of bulk load is different from past versions in + * that it includes the qualifier as part of the sort process. The + * reason for this is to be able to support rows will very large number + * of columns. + * + * @param tableName The HBase table we are loading into + * @param flatMap A flapMap function that will make every row in the RDD + * into N cells for the bulk load + * @param stagingDir The location on the FileSystem to bulk load into + * @param familyHFileWriteOptionsMap Options that will define how the HFile for a + * column family is written + * @param compactionExclude Compaction excluded for the HFiles + * @param maxSize Max size for the HFiles before they roll + */ + def hbaseBulkLoad(hc: HBaseContext, + tableName: TableName, + flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])], + stagingDir:String, + familyHFileWriteOptionsMap: + util.Map[Array[Byte], FamilyHFileWriteOptions] = + new util.HashMap[Array[Byte], FamilyHFileWriteOptions](), + compactionExclude: Boolean = false, + maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit = { + hc.bulkLoad(rdd, tableName, + flatMap, stagingDir, familyHFileWriteOptionsMap, + compactionExclude, maxSize) + } + } +}
