Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/storm into STORM-2623
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/832395a1 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/832395a1 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/832395a1 Branch: refs/heads/master Commit: 832395a1a88d31e6d40db9326fd86c012473c013 Parents: f9d75d8 5d601e8 Author: Kyle Nusbaum <knusb...@yahoo-inc.com> Authored: Wed Aug 9 16:58:59 2017 -0500 Committer: Kyle Nusbaum <knusb...@yahoo-inc.com> Committed: Wed Aug 9 16:58:59 2017 -0500 ---------------------------------------------------------------------- .travis.yml | 3 + CHANGELOG.md | 958 -------- DEVELOPER.md | 11 +- README.markdown | 9 + bin/storm.ps1 | 4 +- bin/storm.py | 60 +- conf/defaults.yaml | 12 +- conf/drpc-auth-acl.yaml.example | 28 + dev-tools/checkstyle.xslt | 40 + dev-tools/find-checkstyle-issues.py | 38 + dev-tools/jira-github-join.py | 2 +- dev-tools/jira/__init__.py | 285 --- dev-tools/jira_github/__init__.py | 285 +++ dev-tools/release_notes.py | 118 + dev-tools/report/report.py | 2 +- docs/Metrics.md | 209 +- docs/Resource_Aware_Scheduler_overview.md | 132 +- docs/SECURITY.md | 40 +- docs/STORM-UI-REST-API.md | 135 +- docs/Setting-up-a-Storm-cluster.md | 4 +- docs/State-checkpointing.md | 2 + docs/Windowing.md | 108 + docs/cgroups_in_storm.md | 2 +- docs/storm-kafka-client.md | 214 +- docs/storm-rocketmq.md | 28 +- .../storm-kafka-client-examples/README.markdown | 10 + examples/storm-kafka-client-examples/pom.xml | 8 +- .../kafka/spout/test/KafkaSpoutTestBolt.java | 49 + .../test/KafkaSpoutTopologyMainNamedTopics.java | 106 + .../KafkaSpoutTopologyMainWildcardTopics.java | 61 + .../TridentKafkaClientWordCountNamedTopics.java | 75 +- ...identKafkaClientWordCountWildcardTopics.java | 10 +- .../KafkaSpoutTopologyMainNamedTopicsLocal.java | 66 + ...fkaSpoutTopologyMainWildcardTopicsLocal.java | 29 + .../rocketmq/topology/WordCountTopology.java | 12 +- .../rocketmq/trident/WordCountTrident.java | 16 +- .../starter/PersistentWindowingTopology.java | 176 ++ .../starter/ResourceAwareExampleTopology.java | 177 +- .../storm/starter/SlidingWindowTopology.java | 1 + .../apache/storm/starter/StatefulTopology.java | 2 + .../starter/StatefulWindowingTopology.java | 1 + .../AbstractHadoopNimbusPluginAutoCreds.java | 14 +- .../apache/storm/hbase/security/AutoHBase.java | 3 +- .../storm/hbase/security/AutoHBaseCommand.java | 5 +- .../storm/hbase/security/AutoHBaseNimbus.java | 16 +- .../apache/storm/hdfs/security/AutoHDFS.java | 3 +- .../storm/hdfs/security/AutoHDFSCommand.java | 5 +- .../storm/hdfs/security/AutoHDFSNimbus.java | 16 +- .../apache/storm/hive/security/AutoHive.java | 3 +- .../storm/hive/security/AutoHiveCommand.java | 5 +- .../storm/hive/security/AutoHiveNimbus.java | 13 +- external/storm-cassandra/README.md | 9 + .../AbstractExecutionResultHandler.java | 1 + .../cassandra/BaseExecutionResultHandler.java | 1 + .../storm/cassandra/CassandraContext.java | 1 + .../cassandra/DynamicStatementBuilder.java | 1 + .../storm/cassandra/ExecutionResultHandler.java | 1 + .../storm/cassandra/Murmur3StreamGrouping.java | 3 +- .../storm/cassandra/bolt/BaseCassandraBolt.java | 1 + .../storm/cassandra/client/CassandraConf.java | 122 +- .../storm/cassandra/client/ClusterFactory.java | 20 +- external/storm-hbase/pom.xml | 2 - .../hbase/state/HBaseKeyValueStateProvider.java | 31 +- .../hive/bolt/mapper/JsonRecordHiveMapper.java | 2 +- .../apache/storm/jdbc/common/JdbcClient.java | 166 +- .../storm/jdbc/common/JdbcClientTest.java | 43 + external/storm-jms/pom.xml | 2 +- .../org/apache/storm/jms/spout/JmsSpout.java | 292 ++- .../apache/storm/jms/spout/JmsSpoutTest.java | 79 +- external/storm-kafka-client/README.md | 3 +- external/storm-kafka-client/pom.xml | 5 +- .../apache/storm/kafka/spout/KafkaSpout.java | 5 +- .../storm/kafka/spout/KafkaSpoutConfig.java | 491 ++-- .../spout/ManualPartitionNamedSubscription.java | 78 - .../ManualPartitionPatternSubscription.java | 76 - .../storm/kafka/spout/ManualPartitioner.java | 40 - .../storm/kafka/spout/NamedSubscription.java | 61 - .../storm/kafka/spout/PatternSubscription.java | 54 - .../spout/RoundRobinManualPartitioner.java | 50 - .../kafka/spout/SerializableDeserializer.java | 26 - .../apache/storm/kafka/spout/Subscription.java | 53 - .../internal/KafkaConsumerFactoryDefault.java | 3 +- .../kafka/spout/internal/OffsetManager.java | 7 +- .../ManualPartitionSubscription.java | 72 + .../spout/subscription/ManualPartitioner.java | 41 + .../spout/subscription/NamedTopicFilter.java | 67 + .../spout/subscription/PatternTopicFilter.java | 69 + .../RoundRobinManualPartitioner.java | 50 + .../kafka/spout/subscription/Subscription.java | 53 + .../kafka/spout/subscription/TopicFilter.java | 38 + .../trident/KafkaTridentSpoutBatchMetadata.java | 2 +- .../spout/trident/KafkaTridentSpoutManager.java | 3 +- .../kafka/trident/TridentKafkaStateUpdater.java | 32 + .../storm/kafka/spout/KafkaSpoutCommitTest.java | 131 ++ .../storm/kafka/spout/KafkaSpoutConfigTest.java | 8 +- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 55 +- .../kafka/spout/KafkaSpoutRebalanceTest.java | 42 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 117 + .../kafka/spout/MaxUncommittedOffsetTest.java | 15 +- .../kafka/spout/SingleTopicKafkaSpoutTest.java | 14 +- .../SpoutWithMockedConsumerSetupHelper.java | 73 + .../SingleTopicKafkaSpoutConfiguration.java | 65 - .../SingleTopicKafkaSpoutConfiguration.java | 66 + .../subscription/NamedTopicFilterTest.java | 68 + .../subscription/PatternTopicFilterTest.java | 73 + .../kafka/spout/test/KafkaSpoutTestBolt.java | 49 - .../test/KafkaSpoutTopologyMainNamedTopics.java | 114 - .../KafkaSpoutTopologyMainWildcardTopics.java | 57 - .../apache/storm/kafka/PartitionManager.java | 4 +- .../org/apache/storm/kafka/KafkaTestBroker.java | 101 +- .../storm/kafka/PartitionManagerTest.java | 243 ++ .../spout/ExponentialBackoffRetrier.java | 10 +- .../kinesis/spout/KinesisRecordsManager.java | 18 +- external/storm-mongodb/README.md | 1 + external/storm-mongodb/pom.xml | 5 +- .../storm/mongodb/bolt/AbstractMongoBolt.java | 22 +- .../storm/mongodb/bolt/MongoInsertBolt.java | 19 +- .../storm/mongodb/bolt/MongoLookupBolt.java | 15 +- .../storm/mongodb/bolt/MongoUpdateBolt.java | 12 +- .../storm/mongodb/common/MongoDBClient.java | 103 - .../storm/mongodb/common/MongoDbClient.java | 109 + .../apache/storm/mongodb/common/MongoUtils.java | 10 +- .../mongodb/common/QueryFilterCreator.java | 9 +- .../common/SimpleQueryFilterCreator.java | 9 +- .../common/mapper/MongoLookupMapper.java | 9 +- .../mongodb/common/mapper/MongoMapper.java | 5 +- .../common/mapper/MongoUpdateMapper.java | 7 +- .../common/mapper/SimpleMongoLookupMapper.java | 12 +- .../common/mapper/SimpleMongoMapper.java | 9 +- .../common/mapper/SimpleMongoUpdateMapper.java | 5 +- .../mongodb/trident/state/MongoMapState.java | 40 +- .../storm/mongodb/trident/state/MongoState.java | 21 +- .../trident/state/MongoStateFactory.java | 1 + .../mongodb/trident/state/MongoStateQuery.java | 5 +- .../trident/state/MongoStateUpdater.java | 1 + external/storm-pmml/pom.xml | 2 +- .../redis/state/RedisKeyValueStateProvider.java | 47 +- external/storm-rocketmq/README.md | 28 +- external/storm-rocketmq/pom.xml | 3 - .../apache/storm/rocketmq/ConsumerMessage.java | 1 + .../rocketmq/DefaultMessageBodySerializer.java | 3 +- .../rocketmq/DefaultMessageRetryManager.java | 20 +- .../storm/rocketmq/MessageBodySerializer.java | 5 +- .../storm/rocketmq/MessageRetryManager.java | 13 +- .../apache/storm/rocketmq/RocketMQConfig.java | 162 -- .../apache/storm/rocketmq/RocketMQUtils.java | 64 - .../apache/storm/rocketmq/RocketMqConfig.java | 178 ++ .../apache/storm/rocketmq/RocketMqUtils.java | 76 + .../org/apache/storm/rocketmq/SpoutConfig.java | 3 +- .../storm/rocketmq/bolt/RocketMQBolt.java | 160 -- .../storm/rocketmq/bolt/RocketMqBolt.java | 160 ++ .../FieldNameBasedTupleToMessageMapper.java | 14 +- .../common/mapper/TupleToMessageMapper.java | 6 +- .../common/selector/DefaultTopicSelector.java | 5 +- .../selector/FieldNameBasedTopicSelector.java | 9 +- .../rocketmq/common/selector/TopicSelector.java | 6 +- .../storm/rocketmq/spout/RocketMQSpout.java | 218 -- .../storm/rocketmq/spout/RocketMqSpout.java | 223 ++ .../rocketmq/spout/scheme/KeyValueScheme.java | 4 +- .../spout/scheme/StringKeyValueScheme.java | 6 +- .../rocketmq/spout/scheme/StringScheme.java | 23 +- .../rocketmq/trident/state/RocketMQState.java | 117 - .../trident/state/RocketMQStateFactory.java | 42 - .../trident/state/RocketMQStateUpdater.java | 34 - .../rocketmq/trident/state/RocketMqState.java | 123 + .../trident/state/RocketMqStateFactory.java | 43 + .../trident/state/RocketMqStateUpdater.java | 35 + .../storm/rocketmq/TestMessageRetryManager.java | 15 +- .../apache/storm/flux/parser/FluxParser.java | 11 +- .../java/org/apache/storm/flux/TCKTest.java | 9 + .../resources/configs/substitution-test.yaml | 3 +- .../src/test/resources/configs/test.properties | 1 + integration-test/run-it.sh | 2 +- .../org/apache/storm/st/wrapper/TopoWrap.java | 4 +- pom.xml | 16 +- sql/storm-sql-external/storm-sql-hdfs/pom.xml | 5 +- .../storm/sql/hdfs/HdfsDataSourcesProvider.java | 150 +- sql/storm-sql-external/storm-sql-kafka/pom.xml | 3 - .../sql/kafka/KafkaDataSourcesProvider.java | 223 +- .../storm-sql-mongodb/pom.xml | 5 +- .../sql/mongodb/MongoDataSourcesProvider.java | 136 +- .../mongodb/TestMongoDataSourcesProvider.java | 4 +- sql/storm-sql-external/storm-sql-redis/pom.xml | 3 - .../sql/redis/RedisDataSourcesProvider.java | 292 +-- sql/storm-sql-runtime/pom.xml | 3 - .../calcite/interpreter/StormContext.java | 5 +- .../sql/runtime/AbstractChannelHandler.java | 43 +- .../sql/runtime/AbstractValuesProcessor.java | 23 +- .../storm/sql/runtime/ChannelContext.java | 18 +- .../storm/sql/runtime/ChannelHandler.java | 21 +- .../org/apache/storm/sql/runtime/Channels.java | 149 +- .../apache/storm/sql/runtime/DataSource.java | 3 +- .../storm/sql/runtime/DataSourcesProvider.java | 42 +- .../storm/sql/runtime/DataSourcesRegistry.java | 96 +- .../org/apache/storm/sql/runtime/FieldInfo.java | 43 +- .../storm/sql/runtime/IOutputSerializer.java | 15 +- .../sql/runtime/ISqlTridentDataSource.java | 72 +- .../sql/runtime/SimpleSqlTridentConsumer.java | 1 + .../storm/sql/runtime/StormSqlFunctions.java | 34 +- .../runtime/calcite/ExecutableExpression.java | 5 +- .../sql/runtime/calcite/StormDataContext.java | 13 +- .../socket/SocketDataSourcesProvider.java | 13 +- .../datasource/socket/trident/SocketState.java | 12 +- .../socket/trident/SocketStateUpdater.java | 6 +- .../socket/trident/TridentSocketSpout.java | 33 +- .../sql/runtime/serde/avro/AvroScheme.java | 74 +- .../sql/runtime/serde/avro/AvroSerializer.java | 75 +- .../sql/runtime/serde/avro/CachedSchemas.java | 12 +- .../storm/sql/runtime/serde/csv/CsvScheme.java | 60 +- .../sql/runtime/serde/csv/CsvSerializer.java | 39 +- .../sql/runtime/serde/json/JsonScheme.java | 56 +- .../sql/runtime/serde/json/JsonSerializer.java | 46 +- .../storm/sql/runtime/serde/tsv/TsvScheme.java | 54 +- .../sql/runtime/serde/tsv/TsvSerializer.java | 41 +- .../trident/functions/EvaluationCalc.java | 17 +- .../trident/functions/EvaluationFilter.java | 10 +- .../trident/functions/EvaluationFunction.java | 11 +- .../trident/functions/ForwardFunction.java | 1 + .../storm/sql/runtime/utils/FieldInfoUtils.java | 4 +- .../storm/sql/runtime/utils/SerdeUtils.java | 41 +- .../apache/storm/sql/runtime/utils/Utils.java | 21 +- storm-buildtools/storm_checkstyle.xml | 6 + storm-client-misc/pom.xml | 2 +- .../src/jvm/org/apache/storm/Config.java | 59 +- .../jvm/org/apache/storm/StormSubmitter.java | 63 +- .../storm/cluster/StormClusterStateImpl.java | 21 +- .../coordination/BatchSubtopologyBuilder.java | 19 +- .../org/apache/storm/daemon/StormCommon.java | 6 +- .../daemon/metrics/ErrorReportingMetrics.java | 38 + .../storm/daemon/supervisor/AdvancedFSOps.java | 51 +- .../supervisor/ClientSupervisorUtils.java | 8 +- .../storm/daemon/supervisor/IAdvancedFSOps.java | 10 +- .../storm/drpc/LinearDRPCTopologyBuilder.java | 26 +- .../jvm/org/apache/storm/executor/Executor.java | 9 + .../storm/executor/bolt/BoltExecutor.java | 2 + .../executor/bolt/BoltOutputCollectorImpl.java | 1 + .../storm/executor/spout/SpoutExecutor.java | 1 + .../spout/SpoutOutputCollectorImpl.java | 1 + .../org/apache/storm/generated/Assignment.java | 519 +++-- .../org/apache/storm/generated/BoltStats.java | 440 ++-- .../apache/storm/generated/ClusterSummary.java | 108 +- .../storm/generated/ClusterWorkerHeartbeat.java | 52 +- .../storm/generated/CommonAggregateStats.java | 44 +- .../storm/generated/ComponentPageInfo.java | 264 +-- .../org/apache/storm/generated/Credentials.java | 44 +- .../apache/storm/generated/ExecutorStats.java | 168 +- .../jvm/org/apache/storm/generated/HBNodes.java | 32 +- .../org/apache/storm/generated/HBRecords.java | 36 +- .../storm/generated/LSApprovedWorkers.java | 44 +- .../generated/LSSupervisorAssignments.java | 48 +- .../apache/storm/generated/LSTopoHistory.java | 64 +- .../storm/generated/LSTopoHistoryList.java | 36 +- .../storm/generated/LSWorkerHeartbeat.java | 36 +- .../apache/storm/generated/ListBlobsResult.java | 32 +- .../apache/storm/generated/LocalAssignment.java | 253 ++- .../apache/storm/generated/LocalStateData.java | 48 +- .../org/apache/storm/generated/LogConfig.java | 48 +- .../jvm/org/apache/storm/generated/Nimbus.java | 2059 ++++++++++++----- .../org/apache/storm/generated/NodeInfo.java | 32 +- .../storm/generated/OwnerResourceSummary.java | 2097 ++++++++++++++++++ .../storm/generated/RebalanceOptions.java | 44 +- .../storm/generated/SettableBlobMeta.java | 36 +- .../apache/storm/generated/SharedMemory.java | 711 ++++++ .../org/apache/storm/generated/SpoutStats.java | 252 +-- .../org/apache/storm/generated/StormBase.java | 312 ++- .../apache/storm/generated/StormTopology.java | 531 ++++- .../apache/storm/generated/SupervisorInfo.java | 152 +- .../storm/generated/SupervisorPageInfo.java | 72 +- .../storm/generated/SupervisorSummary.java | 44 +- .../storm/generated/TopologyHistoryInfo.java | 32 +- .../apache/storm/generated/TopologyInfo.java | 160 +- .../storm/generated/TopologyPageInfo.java | 1062 ++++++++- .../apache/storm/generated/TopologyStats.java | 220 +- .../apache/storm/generated/TopologySummary.java | 146 +- .../apache/storm/generated/WorkerResources.java | 206 +- .../apache/storm/generated/WorkerSummary.java | 44 +- .../storm/metric/cgroup/CGroupMemoryLimit.java | 16 + .../apache/storm/pacemaker/PacemakerClient.java | 28 +- .../storm/pacemaker/codec/ThriftDecoder.java | 31 +- .../pacemaker/codec/ThriftNettyClientCodec.java | 7 +- .../jvm/org/apache/storm/scheduler/Cluster.java | 837 ------- .../jvm/org/apache/storm/scheduler/INimbus.java | 49 - .../org/apache/storm/scheduler/IScheduler.java | 40 - .../org/apache/storm/scheduler/ISupervisor.java | 45 - .../storm/scheduler/SchedulerAssignment.java | 29 +- .../scheduler/SchedulerAssignmentImpl.java | 141 +- .../storm/scheduler/SupervisorDetails.java | 4 +- .../org/apache/storm/scheduler/Topologies.java | 87 - .../apache/storm/scheduler/TopologyDetails.java | 517 ----- .../org/apache/storm/scheduler/WorkerSlot.java | 43 +- .../storm/scheduler/resource/Component.java | 54 - .../storm/scheduler/resource/RAS_Node.java | 529 ----- .../storm/scheduler/resource/RAS_Nodes.java | 138 -- .../storm/scheduler/resource/ResourceUtils.java | 207 -- .../scheduler/resource/SchedulingResult.java | 116 - .../scheduler/resource/SchedulingState.java | 56 - .../scheduler/resource/SchedulingStatus.java | 40 - .../apache/storm/scheduler/resource/User.java | 350 --- .../DefaultResourceAwareStrategy.java | 757 ------- .../strategies/scheduling/IStrategy.java | 47 - .../storm/security/INimbusCredentialPlugin.java | 25 +- .../org/apache/storm/security/auth/AutoSSL.java | 8 +- .../security/auth/DefaultPrincipalToLocal.java | 5 +- .../security/auth/ICredentialsRenewer.java | 20 +- .../storm/security/auth/IPrincipalToLocal.java | 6 +- .../security/auth/KerberosPrincipalToLocal.java | 5 +- .../auth/authorizer/SimpleACLAuthorizer.java | 65 +- .../storm/security/auth/kerberos/AutoTGT.java | 2 +- .../serialization/SerializationFactory.java | 63 +- .../apache/storm/state/DefaultStateEncoder.java | 5 +- .../storm/state/DefaultStateSerializer.java | 64 +- .../topology/BaseConfigurationDeclarer.java | 9 +- .../ComponentConfigurationDeclarer.java | 36 + .../storm/topology/IStatefulWindowedBolt.java | 22 +- .../PersistentWindowedBoltExecutor.java | 256 +++ .../apache/storm/topology/ResourceDeclarer.java | 30 + .../storm/topology/SharedOffHeapWithinNode.java | 38 + .../topology/SharedOffHeapWithinWorker.java | 36 + .../org/apache/storm/topology/SharedOnHeap.java | 38 + .../storm/topology/StatefulBoltExecutor.java | 7 +- .../apache/storm/topology/TopologyBuilder.java | 54 +- .../storm/topology/WindowedBoltExecutor.java | 86 +- .../topology/base/BaseStatefulWindowedBolt.java | 39 + .../TransactionalTopologyBuilder.java | 47 +- .../jvm/org/apache/storm/trident/Stream.java | 7 + .../org/apache/storm/trident/TridentState.java | 7 + .../apache/storm/trident/TridentTopology.java | 5 + .../org/apache/storm/trident/graph/Group.java | 12 + .../operation/DefaultResourceDeclarer.java | 24 +- .../trident/operation/ITridentResource.java | 8 + .../topology/TridentTopologyBuilder.java | 46 +- .../windowing/AbstractTridentWindowManager.java | 4 +- .../strategy/SlidingCountWindowStrategy.java | 4 +- .../strategy/SlidingDurationWindowStrategy.java | 4 +- .../strategy/TumblingCountWindowStrategy.java | 4 +- .../TumblingDurationWindowStrategy.java | 4 +- .../windowing/strategy/WindowStrategy.java | 4 +- .../org/apache/storm/utils/DisruptorQueue.java | 14 +- .../apache/storm/utils/ThriftTopologyUtils.java | 40 +- .../src/jvm/org/apache/storm/utils/Utils.java | 2 +- .../storm/windowing/CountEvictionPolicy.java | 17 +- .../storm/windowing/CountTriggerPolicy.java | 24 +- .../jvm/org/apache/storm/windowing/Event.java | 2 +- .../apache/storm/windowing/EvictionPolicy.java | 35 +- .../storm/windowing/StatefulWindowManager.java | 164 ++ .../storm/windowing/TimeEvictionPolicy.java | 25 +- .../storm/windowing/TimeTriggerPolicy.java | 16 +- .../apache/storm/windowing/TriggerPolicy.java | 20 +- .../storm/windowing/TupleWindowIterImpl.java | 80 + .../windowing/WatermarkCountEvictionPolicy.java | 61 +- .../windowing/WatermarkCountTriggerPolicy.java | 26 +- .../windowing/WatermarkTimeEvictionPolicy.java | 1 + .../windowing/WatermarkTimeTriggerPolicy.java | 26 +- .../jvm/org/apache/storm/windowing/Window.java | 31 +- .../windowing/WindowLifecycleListener.java | 19 +- .../apache/storm/windowing/WindowManager.java | 51 +- .../persistence/SimpleWindowPartitionCache.java | 203 ++ .../persistence/WindowPartitionCache.java | 142 ++ .../windowing/persistence/WindowState.java | 424 ++++ storm-client/src/py/storm/Nimbus-remote | 7 + storm-client/src/py/storm/Nimbus.py | 229 +- storm-client/src/py/storm/ttypes.py | 1931 ++++++++++------ storm-client/src/storm.thrift | 50 + .../org/apache/storm/scheduler/ClusterTest.java | 111 - .../storm/security/auth/AuthUtilsTestMock.java | 6 +- .../authorizer/SimpleACLAuthorizerTest.java | 388 ++++ .../storm/state/DefaultStateSerializerTest.java | 3 +- .../PersistentWindowedBoltExecutorTest.java | 299 +++ .../SimpleWindowPartitionCacheTest.java | 233 ++ .../storm/windowing/WindowManagerTest.java | 46 +- .../windowing/persistence/WindowStateTest.java | 246 ++ .../clj/org/apache/storm/daemon/logviewer.clj | 1254 ----------- storm-core/src/clj/org/apache/storm/ui/core.clj | 147 +- .../dev/logviewer-search-context-tests.log.gz | Bin 72 -> 0 bytes .../dev/logviewer-search-context-tests.log.test | 1 - storm-core/src/dev/small-worker.log.test | 1 - storm-core/src/dev/test-3072.log.test | 3 - storm-core/src/dev/test-worker.log.test | 380 ---- .../daemon/ClientJarTransformerRunner.java | 49 - .../apache/storm/daemon/DirectoryCleaner.java | 174 -- .../org/apache/storm/daemon/JarTransformer.java | 31 - storm-core/src/ui/public/component.html | 22 +- storm-core/src/ui/public/css/style.css | 25 + .../src/ui/public/deep_search_result.html | 24 +- storm-core/src/ui/public/index.html | 18 + storm-core/src/ui/public/js/script.js | 74 + storm-core/src/ui/public/logviewer_search.html | 8 +- storm-core/src/ui/public/owner.html | 211 ++ storm-core/src/ui/public/search_result.html | 10 +- storm-core/src/ui/public/supervisor.html | 6 + .../deep-search-result-page-template.html | 8 +- .../public/templates/index-page-template.html | 73 +- .../public/templates/owner-page-template.html | 239 ++ .../templates/topology-page-template.html | 108 +- storm-core/src/ui/public/topology.html | 7 + .../apache/storm/trident/integration_test.clj | 2 +- .../clj/org/apache/storm/logviewer_test.clj | 824 ------- .../test/clj/org/apache/storm/nimbus_test.clj | 161 +- .../scheduler/multitenant_scheduler_test.clj | 160 +- .../clj/org/apache/storm/scheduler_test.clj | 29 +- .../test/jvm/org/apache/storm/MockAutoCred.java | 4 +- .../final-package/src/main/assembly/binary.xml | 6 +- .../src/main/resources/resources/storm.py | 2 +- storm-rename-hack/pom.xml | 131 -- .../org/apache/storm/hack/DefaultShader.java | 391 ---- .../main/java/org/apache/storm/hack/IOUtil.java | 41 - .../org/apache/storm/hack/ShadeRequest.java | 69 - .../apache/storm/hack/StormShadeRequest.java | 41 - .../storm/hack/StormShadeTransformer.java | 37 - .../apache/storm/hack/relocation/Relocator.java | 40 - .../storm/hack/relocation/SimpleRelocator.java | 97 - .../storm/hack/resource/ClojureTransformer.java | 71 - .../hack/resource/ResourceTransformer.java | 46 - .../java/org/apache/storm/DaemonConfig.java | 102 +- .../java/org/apache/storm/LocalCluster.java | 39 +- .../apache/storm/blobstore/BlobStoreUtils.java | 4 +- .../container/ResourceIsolationInterface.java | 51 +- .../storm/container/cgroup/CgroupManager.java | 161 +- .../reporters/JmxPreparableReporter.java | 1 - .../org/apache/storm/daemon/nimbus/Nimbus.java | 381 +++- .../storm/daemon/nimbus/TopologyResources.java | 269 ++- .../storm/daemon/supervisor/BasicContainer.java | 203 +- .../storm/daemon/supervisor/Container.java | 151 +- .../daemon/supervisor/ReadClusterState.java | 20 +- .../apache/storm/daemon/supervisor/Slot.java | 73 +- .../storm/daemon/supervisor/Supervisor.java | 16 +- .../daemon/supervisor/SupervisorUtils.java | 7 +- .../daemon/supervisor/timer/UpdateBlobs.java | 13 +- .../apache/storm/localizer/AsyncLocalizer.java | 43 +- .../logging/filters/AccessLoggingFilter.java | 36 +- .../apache/storm/pacemaker/PacemakerServer.java | 6 +- .../pacemaker/codec/ThriftNettyServerCodec.java | 8 +- .../org/apache/storm/scheduler/Cluster.java | 1050 +++++++++ .../org/apache/storm/scheduler/Component.java | 88 + .../storm/scheduler/DefaultScheduler.java | 13 +- .../apache/storm/scheduler/EvenScheduler.java | 30 +- .../org/apache/storm/scheduler/INimbus.java | 47 + .../org/apache/storm/scheduler/IScheduler.java | 47 + .../storm/scheduler/ISchedulingState.java | 275 +++ .../org/apache/storm/scheduler/ISupervisor.java | 50 + .../storm/scheduler/IsolationScheduler.java | 5 + .../storm/scheduler/SingleTopologyCluster.java | 46 + .../org/apache/storm/scheduler/Topologies.java | 139 ++ .../apache/storm/scheduler/TopologyDetails.java | 573 +++++ .../multitenant/MultitenantScheduler.java | 7 +- .../storm/scheduler/resource/RAS_Node.java | 465 ++++ .../storm/scheduler/resource/RAS_Nodes.java | 152 ++ .../resource/ResourceAwareScheduler.java | 456 ++-- .../storm/scheduler/resource/ResourceUtils.java | 175 ++ .../scheduler/resource/SchedulingResult.java | 88 + .../scheduler/resource/SchedulingStatus.java | 41 + .../apache/storm/scheduler/resource/User.java | 228 ++ .../eviction/DefaultEvictionStrategy.java | 68 +- .../strategies/eviction/IEvictionStrategy.java | 17 +- .../DefaultSchedulingPriorityStrategy.java | 47 +- .../priority/ISchedulingPriorityStrategy.java | 13 +- .../DefaultResourceAwareStrategy.java | 741 +++++++ .../strategies/scheduling/IStrategy.java | 48 + .../apache/storm/utils/ServerConfigUtils.java | 3 +- .../org/apache/storm/utils/ServerUtils.java | 2 +- .../java/org/apache/storm/LocalStateTest.java | 2 +- .../java/org/apache/storm/MessagingTest.java | 3 - .../test/java/org/apache/storm/TestCgroups.java | 5 +- .../apache/storm/blobstore/BlobStoreTest.java | 2 +- .../apache/storm/daemon/nimbus/NimbusTest.java | 56 + .../daemon/supervisor/BasicContainerTest.java | 2 + .../storm/daemon/supervisor/ContainerTest.java | 6 +- .../storm/localizer/AsyncLocalizerTest.java | 8 +- .../org/apache/storm/scheduler/ClusterTest.java | 111 + .../resource/TestResourceAwareScheduler.java | 923 ++------ .../storm/scheduler/resource/TestUser.java | 95 +- .../TestUtilsForResourceAwareScheduler.java | 159 +- .../eviction/TestDefaultEvictionStrategy.java | 793 ++----- .../TestDefaultResourceAwareStrategy.java | 214 +- storm-webapp/pom.xml | 51 +- .../common/AuthorizationExceptionMapper.java | 40 + .../daemon/common/JsonResponseBuilder.java | 74 + .../webapp/AuthorizationExceptionMapper.java | 40 - .../daemon/drpc/webapp/DRPCApplication.java | 1 + .../daemon/logviewer/LogviewerConstant.java | 29 + .../storm/daemon/logviewer/LogviewerServer.java | 172 ++ .../handler/LogviewerLogDownloadHandler.java | 71 + .../handler/LogviewerLogPageHandler.java | 470 ++++ .../handler/LogviewerLogSearchHandler.java | 753 +++++++ .../handler/LogviewerProfileHandler.java | 140 ++ .../logviewer/utils/DirectoryCleaner.java | 196 ++ .../daemon/logviewer/utils/LogCleaner.java | 267 +++ .../logviewer/utils/LogFileDownloader.java | 67 + .../utils/LogviewerResponseBuilder.java | 157 ++ .../logviewer/utils/ResourceAuthorizer.java | 158 ++ .../daemon/logviewer/utils/WorkerLogs.java | 219 ++ .../logviewer/webapp/LogviewerApplication.java | 102 + .../logviewer/webapp/LogviewerResource.java | 269 +++ .../daemon/utils/ListFunctionalSupport.java | 108 + .../apache/storm/daemon/utils/StreamUtil.java | 47 + .../apache/storm/daemon/utils/UrlBuilder.java | 53 + .../storm/daemon/drpc/DRPCServerTest.java | 13 +- .../handler/LogviewerLogPageHandlerTest.java | 105 + .../handler/LogviewerLogSearchHandlerTest.java | 856 +++++++ .../testsupport/ArgumentsVerifier.java | 43 + .../testsupport/MockDirectoryBuilder.java | 72 + .../logviewer/testsupport/MockFileBuilder.java | 72 + .../daemon/logviewer/utils/LogCleanerTest.java | 350 +++ .../logviewer/utils/ResourceAuthorizerTest.java | 182 ++ .../daemon/logviewer/utils/WorkerLogsTest.java | 75 + .../logviewer-search-context-tests.log.gz | Bin 0 -> 72 bytes .../logviewer-search-context-tests.log.test | 1 + .../src/test/resources/small-worker.log.test | 1 + .../src/test/resources/test-3072.log.test | 3 + .../src/test/resources/test-worker.log.test | 380 ++++ 510 files changed, 33466 insertions(+), 18609 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/832395a1/storm-client/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/832395a1/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java ---------------------------------------------------------------------- diff --cc storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java index 6ceb0cd,83231cd..19838f4 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java @@@ -23,9 -31,8 +31,9 @@@ import org.apache.storm.scheduler.Topol import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy; import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy; - import org.apache.storm.utils.Utils; import org.apache.storm.utils.ReflectionUtils; +import org.apache.storm.utils.DisallowedStrategyException; + import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -99,253 -80,94 +81,101 @@@ public class ResourceAwareScheduler imp if (td == null) { break; } - scheduleTopology(td); - - LOG.debug("Nodes after scheduling:\n{}", this.schedulingState.nodes); + User submitter = userMap.get(td.getTopologySubmitter()); + if (cluster.needsSchedulingRas(td)) { + scheduleTopology(td, cluster, submitter, userMap); + } else { + LOG.warn("Topology {} is already fully scheduled!", td.getName()); + cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled"); + } } - - //update changes to cluster - updateChanges(cluster, topologies); - } - - private void updateChanges(Cluster cluster, Topologies topologies) { - //Cannot simply set this.cluster=schedulingState.cluster since clojure is immutable - cluster.setAssignments(schedulingState.cluster.getAssignments()); - cluster.setBlacklistedHosts(schedulingState.cluster.getBlacklistedHosts()); - cluster.setStatusMap(schedulingState.cluster.getStatusMap()); - cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap()); - cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap()); - cluster.setWorkerResourcesMap(schedulingState.cluster.getWorkerResourcesMap()); - //updating resources used by supervisor - updateSupervisorsResources(cluster, topologies); - } - - private void handleSchedulingError(TopologyDetails td, SchedulingState schedulingState, Exception e) { - LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.", - td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e.getMessage(), td.getName()); - User topologySubmitter = cleanup(schedulingState, td); - topologySubmitter.moveTopoFromPendingToInvalid(td); } - public void scheduleTopology(TopologyDetails td) { - User topologySubmitter = this.schedulingState.userMap.get(td.getTopologySubmitter()); - if (this.schedulingState.cluster.getUnassignedExecutors(td).size() > 0) { - LOG.debug("/********Scheduling topology {} from User {}************/", td.getName(), topologySubmitter); + - SchedulingState schedulingState = checkpointSchedulingState(); - IStrategy rasStrategy = null; + public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter, + Map<String, User> userMap) { + //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds + Cluster workingState = new Cluster(cluster); + IStrategy rasStrategy = null; + String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY); + try { - rasStrategy = (IStrategy) ReflectionUtils.newInstance(strategyConf); ++ rasStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf); + rasStrategy.prepare(conf); ++ } catch (DisallowedStrategyException e) { ++ topologySubmitter.markTopoUnsuccess(td); ++ cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass() ++ + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY ++ + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString()); ++ return; + } catch (RuntimeException e) { + LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.", + strategyConf, td.getName(), e); + topologySubmitter.markTopoUnsuccess(td); + cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy " + + strategyConf + ". Please check logs for details"); + return; + } + + while (true) { + // A copy of the cluster that restricts the strategy to only modify a single topology + SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId()); + SchedulingResult result = null; try { - rasStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf); - } catch (DisallowedStrategyException e) { - handleSchedulingError(td, schedulingState, e); - this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass() - + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY - + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString()); - return; - } catch (RuntimeException e) { - handleSchedulingError(td, schedulingState, e); - this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy " - + td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details"); - return; + result = rasStrategy.schedule(toSchedule, td); + } catch (Exception ex) { + LOG.error("Exception thrown when running strategy {} to schedule topology {}." + + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex); + topologySubmitter.markTopoUnsuccess(td); + cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}" + + rasStrategy.getClass().getName() + ". Please check logs for details"); } - IEvictionStrategy evictionStrategy = null; - while (true) { - SchedulingResult result = null; - try { - // Need to re prepare scheduling strategy with cluster and topologies in case scheduling state was restored - // Pass in a copy of scheduling state since the scheduling strategy should not be able to be able to make modifications to - // the state of cluster directly - rasStrategy.prepare(new SchedulingState(this.schedulingState)); - result = rasStrategy.schedule(td); - } catch (Exception ex) { - LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!" - , rasStrategy.getClass().getName(), td.getName()), ex); - topologySubmitter = cleanup(schedulingState, td); - topologySubmitter.moveTopoFromPendingToInvalid(td); - this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}" - + rasStrategy.getClass().getName() + ". Please check logs for details"); - } - LOG.debug("scheduling result: {}", result); - if (result != null && result.isValid()) { - if (result.isSuccess()) { + LOG.debug("scheduling result: {}", result); + if (result != null) { + if (result.isSuccess()) { + try { + cluster.updateFrom(toSchedule); + cluster.setStatus(td.getId(), "Running - " + result.getMessage()); + } catch (Exception ex) { + LOG.error("Unsuccessful attempting to assign executors to nodes.", ex); + topologySubmitter.markTopoUnsuccess(td); + cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + + "IllegalStateException thrown when attempting to assign executors to nodes. Please check" + + " log for details."); + } + return; + } else { + if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) { + boolean madeSpace = false; try { - if (mkAssignment(td, result.getSchedulingResultMap())) { - topologySubmitter.moveTopoFromPendingToRunning(td); - this.schedulingState.cluster.setStatus(td.getId(), "Running - " + result.getMessage()); - } else { - topologySubmitter = this.cleanup(schedulingState, td); - topologySubmitter.moveTopoFromPendingToAttempted(td); - this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details"); - } - } catch (IllegalStateException ex) { - LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", ex); - topologySubmitter = cleanup(schedulingState, td); - topologySubmitter.moveTopoFromPendingToAttempted(td); - this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details."); + //need to re prepare since scheduling state might have been restored + madeSpace = evictionStrategy.makeSpaceForTopo(td, workingState, userMap); + } catch (Exception ex) { + LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}." + + " No evictions will be done!", evictionStrategy.getClass().getName(), + td.getName(), ex); + topologySubmitter.markTopoUnsuccess(td); + return; } - break; - } else { - if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) { - if (evictionStrategy == null) { - try { - evictionStrategy = (IEvictionStrategy) ReflectionUtils.newInstance((String) this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY)); - } catch (RuntimeException e) { - LOG.error("failed to create instance of eviction strategy: {} with error: {}! No topology eviction will be done.", - this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY), e.getMessage()); - topologySubmitter.moveTopoFromPendingToAttempted(td); - break; - } - } - boolean madeSpace = false; - try { - //need to re prepare since scheduling state might have been restored - evictionStrategy.prepare(this.schedulingState); - madeSpace = evictionStrategy.makeSpaceForTopo(td); - } catch (Exception ex) { - LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s" - , evictionStrategy.getClass().getName(), td.getName(), ex.getClass().getName()), ex); - topologySubmitter = cleanup(schedulingState, td); - topologySubmitter.moveTopoFromPendingToAttempted(td); - break; - } - if (!madeSpace) { - LOG.debug("Could not make space for topo {} will move to attempted", td); - topologySubmitter = cleanup(schedulingState, td); - topologySubmitter.moveTopoFromPendingToAttempted(td); - this.schedulingState.cluster.setStatus(td.getId(), "Not enough resources to schedule - " + result.getErrorMessage()); - break; - } - continue; - } else if (result.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) { - topologySubmitter = cleanup(schedulingState, td); - topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster); - break; - } else { - topologySubmitter = cleanup(schedulingState, td); - topologySubmitter.moveTopoFromPendingToAttempted(td, this.schedulingState.cluster); - break; + if (!madeSpace) { + LOG.debug("Could not make space for topo {} will move to attempted", td); + topologySubmitter.markTopoUnsuccess(td); + cluster.setStatus(td.getId(), "Not enough resources to schedule - " + + result.getErrorMessage()); + return; } + continue; + } else { + topologySubmitter.markTopoUnsuccess(td, cluster); + return; } - } else { - LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", td.getName()); - topologySubmitter = cleanup(schedulingState, td); - topologySubmitter.moveTopoFromPendingToInvalid(td, this.schedulingState.cluster); - break; } + } else { + LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", + td.getName()); + topologySubmitter.markTopoUnsuccess(td, cluster); + return; } - } else { - LOG.warn("Topology {} is already fully scheduled!", td.getName()); - topologySubmitter.moveTopoFromPendingToRunning(td); - if (this.schedulingState.cluster.getStatusMap().get(td.getId()) == null || this.schedulingState.cluster.getStatusMap().get(td.getId()).equals("")) { - this.schedulingState.cluster.setStatus(td.getId(), "Fully Scheduled"); - } - } - } - - private User cleanup(SchedulingState schedulingState, TopologyDetails td) { - restoreCheckpointSchedulingState(schedulingState); - //since state is restored need the update User topologySubmitter to the new User object in userMap - return this.schedulingState.userMap.get(td.getTopologySubmitter()); - } - - private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, Collection<ExecutorDetails>> schedulerAssignmentMap) { - if (schedulerAssignmentMap != null) { - double requestedMemOnHeap = td.getTotalRequestedMemOnHeap(); - double requestedMemOffHeap = td.getTotalRequestedMemOffHeap(); - double requestedCpu = td.getTotalRequestedCpu(); - double assignedMemOnHeap = 0.0; - double assignedMemOffHeap = 0.0; - double assignedCpu = 0.0; - - Map<WorkerSlot, Double[]> workerResources = new HashMap<WorkerSlot, Double[]>(); - - Set<String> nodesUsed = new HashSet<String>(); - for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> workerToTasksEntry : schedulerAssignmentMap.entrySet()) { - WorkerSlot targetSlot = workerToTasksEntry.getKey(); - Collection<ExecutorDetails> execsNeedScheduling = workerToTasksEntry.getValue(); - RAS_Node targetNode = this.schedulingState.nodes.getNodeById(targetSlot.getNodeId()); - - targetSlot = allocateResourceToSlot(td, execsNeedScheduling, targetSlot); - - targetNode.assign(targetSlot, td, execsNeedScheduling); - - LOG.debug("ASSIGNMENT TOPOLOGY: {} TASKS: {} To Node: {} on Slot: {}", - td.getName(), execsNeedScheduling, targetNode.getHostname(), targetSlot.getPort()); - - for (ExecutorDetails exec : execsNeedScheduling) { - targetNode.consumeResourcesforTask(exec, td); - } - if (!nodesUsed.contains(targetNode.getId())) { - nodesUsed.add(targetNode.getId()); - } - assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap(); - assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap(); - assignedCpu += targetSlot.getAllocatedCpu(); - - Double[] worker_resources = { - requestedMemOnHeap, requestedMemOffHeap, requestedCpu, - targetSlot.getAllocatedMemOnHeap(), targetSlot.getAllocatedMemOffHeap(), targetSlot.getAllocatedCpu()}; - workerResources.put (targetSlot, worker_resources); - } - - Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, requestedCpu, - assignedMemOnHeap, assignedMemOffHeap, assignedCpu}; - LOG.debug("setTopologyResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} " + - "assigned on-heap mem, off-heap mem, cpu: {} {} {}", - td.getId(), requestedMemOnHeap, requestedMemOffHeap, requestedCpu, - assignedMemOnHeap, assignedMemOffHeap, assignedCpu); - //updating resources used for a topology - this.schedulingState.cluster.setTopologyResources(td.getId(), resources); - this.schedulingState.cluster.setWorkerResources(td.getId(), workerResources); - return true; - } else { - LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", td.getName()); - return false; - } - } - - private WorkerSlot allocateResourceToSlot (TopologyDetails td, Collection<ExecutorDetails> executors, WorkerSlot slot) { - double onHeapMem = 0.0; - double offHeapMem = 0.0; - double cpu = 0.0; - for (ExecutorDetails exec : executors) { - Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec); - if (onHeapMemForExec != null) { - onHeapMem += onHeapMemForExec; - } - Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec); - if (offHeapMemForExec != null) { - offHeapMem += offHeapMemForExec; - } - Double cpuForExec = td.getTotalCpuReqTask(exec); - if (cpuForExec != null) { - cpu += cpuForExec; - } - } - return new WorkerSlot(slot.getNodeId(), slot.getPort(), onHeapMem, offHeapMem, cpu); - } - - private void updateSupervisorsResources(Cluster cluster, Topologies topologies) { - Map<String, Double[]> supervisors_resources = new HashMap<String, Double[]>(); - Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, topologies); - for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) { - RAS_Node node = entry.getValue(); - Double totalMem = node.getTotalMemoryResources(); - Double totalCpu = node.getTotalCpuResources(); - Double usedMem = totalMem - node.getAvailableMemoryResources(); - Double usedCpu = totalCpu - node.getAvailableCpuResources(); - Double[] resources = {totalMem, totalCpu, usedMem, usedCpu}; - supervisors_resources.put(entry.getKey(), resources); } - cluster.setSupervisorsResourcesMap(supervisors_resources); - } - - public User getUser(String user) { - return this.schedulingState.userMap.get(user); - } - - public Map<String, User> getUserMap() { - return this.schedulingState.userMap; } /**