Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/storm into 
STORM-2623


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/832395a1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/832395a1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/832395a1

Branch: refs/heads/master
Commit: 832395a1a88d31e6d40db9326fd86c012473c013
Parents: f9d75d8 5d601e8
Author: Kyle Nusbaum <knusb...@yahoo-inc.com>
Authored: Wed Aug 9 16:58:59 2017 -0500
Committer: Kyle Nusbaum <knusb...@yahoo-inc.com>
Committed: Wed Aug 9 16:58:59 2017 -0500

----------------------------------------------------------------------
 .travis.yml                                     |    3 +
 CHANGELOG.md                                    |  958 --------
 DEVELOPER.md                                    |   11 +-
 README.markdown                                 |    9 +
 bin/storm.ps1                                   |    4 +-
 bin/storm.py                                    |   60 +-
 conf/defaults.yaml                              |   12 +-
 conf/drpc-auth-acl.yaml.example                 |   28 +
 dev-tools/checkstyle.xslt                       |   40 +
 dev-tools/find-checkstyle-issues.py             |   38 +
 dev-tools/jira-github-join.py                   |    2 +-
 dev-tools/jira/__init__.py                      |  285 ---
 dev-tools/jira_github/__init__.py               |  285 +++
 dev-tools/release_notes.py                      |  118 +
 dev-tools/report/report.py                      |    2 +-
 docs/Metrics.md                                 |  209 +-
 docs/Resource_Aware_Scheduler_overview.md       |  132 +-
 docs/SECURITY.md                                |   40 +-
 docs/STORM-UI-REST-API.md                       |  135 +-
 docs/Setting-up-a-Storm-cluster.md              |    4 +-
 docs/State-checkpointing.md                     |    2 +
 docs/Windowing.md                               |  108 +
 docs/cgroups_in_storm.md                        |    2 +-
 docs/storm-kafka-client.md                      |  214 +-
 docs/storm-rocketmq.md                          |   28 +-
 .../storm-kafka-client-examples/README.markdown |   10 +
 examples/storm-kafka-client-examples/pom.xml    |    8 +-
 .../kafka/spout/test/KafkaSpoutTestBolt.java    |   49 +
 .../test/KafkaSpoutTopologyMainNamedTopics.java |  106 +
 .../KafkaSpoutTopologyMainWildcardTopics.java   |   61 +
 .../TridentKafkaClientWordCountNamedTopics.java |   75 +-
 ...identKafkaClientWordCountWildcardTopics.java |   10 +-
 .../KafkaSpoutTopologyMainNamedTopicsLocal.java |   66 +
 ...fkaSpoutTopologyMainWildcardTopicsLocal.java |   29 +
 .../rocketmq/topology/WordCountTopology.java    |   12 +-
 .../rocketmq/trident/WordCountTrident.java      |   16 +-
 .../starter/PersistentWindowingTopology.java    |  176 ++
 .../starter/ResourceAwareExampleTopology.java   |  177 +-
 .../storm/starter/SlidingWindowTopology.java    |    1 +
 .../apache/storm/starter/StatefulTopology.java  |    2 +
 .../starter/StatefulWindowingTopology.java      |    1 +
 .../AbstractHadoopNimbusPluginAutoCreds.java    |   14 +-
 .../apache/storm/hbase/security/AutoHBase.java  |    3 +-
 .../storm/hbase/security/AutoHBaseCommand.java  |    5 +-
 .../storm/hbase/security/AutoHBaseNimbus.java   |   16 +-
 .../apache/storm/hdfs/security/AutoHDFS.java    |    3 +-
 .../storm/hdfs/security/AutoHDFSCommand.java    |    5 +-
 .../storm/hdfs/security/AutoHDFSNimbus.java     |   16 +-
 .../apache/storm/hive/security/AutoHive.java    |    3 +-
 .../storm/hive/security/AutoHiveCommand.java    |    5 +-
 .../storm/hive/security/AutoHiveNimbus.java     |   13 +-
 external/storm-cassandra/README.md              |    9 +
 .../AbstractExecutionResultHandler.java         |    1 +
 .../cassandra/BaseExecutionResultHandler.java   |    1 +
 .../storm/cassandra/CassandraContext.java       |    1 +
 .../cassandra/DynamicStatementBuilder.java      |    1 +
 .../storm/cassandra/ExecutionResultHandler.java |    1 +
 .../storm/cassandra/Murmur3StreamGrouping.java  |    3 +-
 .../storm/cassandra/bolt/BaseCassandraBolt.java |    1 +
 .../storm/cassandra/client/CassandraConf.java   |  122 +-
 .../storm/cassandra/client/ClusterFactory.java  |   20 +-
 external/storm-hbase/pom.xml                    |    2 -
 .../hbase/state/HBaseKeyValueStateProvider.java |   31 +-
 .../hive/bolt/mapper/JsonRecordHiveMapper.java  |    2 +-
 .../apache/storm/jdbc/common/JdbcClient.java    |  166 +-
 .../storm/jdbc/common/JdbcClientTest.java       |   43 +
 external/storm-jms/pom.xml                      |    2 +-
 .../org/apache/storm/jms/spout/JmsSpout.java    |  292 ++-
 .../apache/storm/jms/spout/JmsSpoutTest.java    |   79 +-
 external/storm-kafka-client/README.md           |    3 +-
 external/storm-kafka-client/pom.xml             |    5 +-
 .../apache/storm/kafka/spout/KafkaSpout.java    |    5 +-
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  491 ++--
 .../spout/ManualPartitionNamedSubscription.java |   78 -
 .../ManualPartitionPatternSubscription.java     |   76 -
 .../storm/kafka/spout/ManualPartitioner.java    |   40 -
 .../storm/kafka/spout/NamedSubscription.java    |   61 -
 .../storm/kafka/spout/PatternSubscription.java  |   54 -
 .../spout/RoundRobinManualPartitioner.java      |   50 -
 .../kafka/spout/SerializableDeserializer.java   |   26 -
 .../apache/storm/kafka/spout/Subscription.java  |   53 -
 .../internal/KafkaConsumerFactoryDefault.java   |    3 +-
 .../kafka/spout/internal/OffsetManager.java     |    7 +-
 .../ManualPartitionSubscription.java            |   72 +
 .../spout/subscription/ManualPartitioner.java   |   41 +
 .../spout/subscription/NamedTopicFilter.java    |   67 +
 .../spout/subscription/PatternTopicFilter.java  |   69 +
 .../RoundRobinManualPartitioner.java            |   50 +
 .../kafka/spout/subscription/Subscription.java  |   53 +
 .../kafka/spout/subscription/TopicFilter.java   |   38 +
 .../trident/KafkaTridentSpoutBatchMetadata.java |    2 +-
 .../spout/trident/KafkaTridentSpoutManager.java |    3 +-
 .../kafka/trident/TridentKafkaStateUpdater.java |   32 +
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |  131 ++
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |    8 +-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |   55 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |   42 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |  117 +
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   15 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   14 +-
 .../SpoutWithMockedConsumerSetupHelper.java     |   73 +
 .../SingleTopicKafkaSpoutConfiguration.java     |   65 -
 .../SingleTopicKafkaSpoutConfiguration.java     |   66 +
 .../subscription/NamedTopicFilterTest.java      |   68 +
 .../subscription/PatternTopicFilterTest.java    |   73 +
 .../kafka/spout/test/KafkaSpoutTestBolt.java    |   49 -
 .../test/KafkaSpoutTopologyMainNamedTopics.java |  114 -
 .../KafkaSpoutTopologyMainWildcardTopics.java   |   57 -
 .../apache/storm/kafka/PartitionManager.java    |    4 +-
 .../org/apache/storm/kafka/KafkaTestBroker.java |  101 +-
 .../storm/kafka/PartitionManagerTest.java       |  243 ++
 .../spout/ExponentialBackoffRetrier.java        |   10 +-
 .../kinesis/spout/KinesisRecordsManager.java    |   18 +-
 external/storm-mongodb/README.md                |    1 +
 external/storm-mongodb/pom.xml                  |    5 +-
 .../storm/mongodb/bolt/AbstractMongoBolt.java   |   22 +-
 .../storm/mongodb/bolt/MongoInsertBolt.java     |   19 +-
 .../storm/mongodb/bolt/MongoLookupBolt.java     |   15 +-
 .../storm/mongodb/bolt/MongoUpdateBolt.java     |   12 +-
 .../storm/mongodb/common/MongoDBClient.java     |  103 -
 .../storm/mongodb/common/MongoDbClient.java     |  109 +
 .../apache/storm/mongodb/common/MongoUtils.java |   10 +-
 .../mongodb/common/QueryFilterCreator.java      |    9 +-
 .../common/SimpleQueryFilterCreator.java        |    9 +-
 .../common/mapper/MongoLookupMapper.java        |    9 +-
 .../mongodb/common/mapper/MongoMapper.java      |    5 +-
 .../common/mapper/MongoUpdateMapper.java        |    7 +-
 .../common/mapper/SimpleMongoLookupMapper.java  |   12 +-
 .../common/mapper/SimpleMongoMapper.java        |    9 +-
 .../common/mapper/SimpleMongoUpdateMapper.java  |    5 +-
 .../mongodb/trident/state/MongoMapState.java    |   40 +-
 .../storm/mongodb/trident/state/MongoState.java |   21 +-
 .../trident/state/MongoStateFactory.java        |    1 +
 .../mongodb/trident/state/MongoStateQuery.java  |    5 +-
 .../trident/state/MongoStateUpdater.java        |    1 +
 external/storm-pmml/pom.xml                     |    2 +-
 .../redis/state/RedisKeyValueStateProvider.java |   47 +-
 external/storm-rocketmq/README.md               |   28 +-
 external/storm-rocketmq/pom.xml                 |    3 -
 .../apache/storm/rocketmq/ConsumerMessage.java  |    1 +
 .../rocketmq/DefaultMessageBodySerializer.java  |    3 +-
 .../rocketmq/DefaultMessageRetryManager.java    |   20 +-
 .../storm/rocketmq/MessageBodySerializer.java   |    5 +-
 .../storm/rocketmq/MessageRetryManager.java     |   13 +-
 .../apache/storm/rocketmq/RocketMQConfig.java   |  162 --
 .../apache/storm/rocketmq/RocketMQUtils.java    |   64 -
 .../apache/storm/rocketmq/RocketMqConfig.java   |  178 ++
 .../apache/storm/rocketmq/RocketMqUtils.java    |   76 +
 .../org/apache/storm/rocketmq/SpoutConfig.java  |    3 +-
 .../storm/rocketmq/bolt/RocketMQBolt.java       |  160 --
 .../storm/rocketmq/bolt/RocketMqBolt.java       |  160 ++
 .../FieldNameBasedTupleToMessageMapper.java     |   14 +-
 .../common/mapper/TupleToMessageMapper.java     |    6 +-
 .../common/selector/DefaultTopicSelector.java   |    5 +-
 .../selector/FieldNameBasedTopicSelector.java   |    9 +-
 .../rocketmq/common/selector/TopicSelector.java |    6 +-
 .../storm/rocketmq/spout/RocketMQSpout.java     |  218 --
 .../storm/rocketmq/spout/RocketMqSpout.java     |  223 ++
 .../rocketmq/spout/scheme/KeyValueScheme.java   |    4 +-
 .../spout/scheme/StringKeyValueScheme.java      |    6 +-
 .../rocketmq/spout/scheme/StringScheme.java     |   23 +-
 .../rocketmq/trident/state/RocketMQState.java   |  117 -
 .../trident/state/RocketMQStateFactory.java     |   42 -
 .../trident/state/RocketMQStateUpdater.java     |   34 -
 .../rocketmq/trident/state/RocketMqState.java   |  123 +
 .../trident/state/RocketMqStateFactory.java     |   43 +
 .../trident/state/RocketMqStateUpdater.java     |   35 +
 .../storm/rocketmq/TestMessageRetryManager.java |   15 +-
 .../apache/storm/flux/parser/FluxParser.java    |   11 +-
 .../java/org/apache/storm/flux/TCKTest.java     |    9 +
 .../resources/configs/substitution-test.yaml    |    3 +-
 .../src/test/resources/configs/test.properties  |    1 +
 integration-test/run-it.sh                      |    2 +-
 .../org/apache/storm/st/wrapper/TopoWrap.java   |    4 +-
 pom.xml                                         |   16 +-
 sql/storm-sql-external/storm-sql-hdfs/pom.xml   |    5 +-
 .../storm/sql/hdfs/HdfsDataSourcesProvider.java |  150 +-
 sql/storm-sql-external/storm-sql-kafka/pom.xml  |    3 -
 .../sql/kafka/KafkaDataSourcesProvider.java     |  223 +-
 .../storm-sql-mongodb/pom.xml                   |    5 +-
 .../sql/mongodb/MongoDataSourcesProvider.java   |  136 +-
 .../mongodb/TestMongoDataSourcesProvider.java   |    4 +-
 sql/storm-sql-external/storm-sql-redis/pom.xml  |    3 -
 .../sql/redis/RedisDataSourcesProvider.java     |  292 +--
 sql/storm-sql-runtime/pom.xml                   |    3 -
 .../calcite/interpreter/StormContext.java       |    5 +-
 .../sql/runtime/AbstractChannelHandler.java     |   43 +-
 .../sql/runtime/AbstractValuesProcessor.java    |   23 +-
 .../storm/sql/runtime/ChannelContext.java       |   18 +-
 .../storm/sql/runtime/ChannelHandler.java       |   21 +-
 .../org/apache/storm/sql/runtime/Channels.java  |  149 +-
 .../apache/storm/sql/runtime/DataSource.java    |    3 +-
 .../storm/sql/runtime/DataSourcesProvider.java  |   42 +-
 .../storm/sql/runtime/DataSourcesRegistry.java  |   96 +-
 .../org/apache/storm/sql/runtime/FieldInfo.java |   43 +-
 .../storm/sql/runtime/IOutputSerializer.java    |   15 +-
 .../sql/runtime/ISqlTridentDataSource.java      |   72 +-
 .../sql/runtime/SimpleSqlTridentConsumer.java   |    1 +
 .../storm/sql/runtime/StormSqlFunctions.java    |   34 +-
 .../runtime/calcite/ExecutableExpression.java   |    5 +-
 .../sql/runtime/calcite/StormDataContext.java   |   13 +-
 .../socket/SocketDataSourcesProvider.java       |   13 +-
 .../datasource/socket/trident/SocketState.java  |   12 +-
 .../socket/trident/SocketStateUpdater.java      |    6 +-
 .../socket/trident/TridentSocketSpout.java      |   33 +-
 .../sql/runtime/serde/avro/AvroScheme.java      |   74 +-
 .../sql/runtime/serde/avro/AvroSerializer.java  |   75 +-
 .../sql/runtime/serde/avro/CachedSchemas.java   |   12 +-
 .../storm/sql/runtime/serde/csv/CsvScheme.java  |   60 +-
 .../sql/runtime/serde/csv/CsvSerializer.java    |   39 +-
 .../sql/runtime/serde/json/JsonScheme.java      |   56 +-
 .../sql/runtime/serde/json/JsonSerializer.java  |   46 +-
 .../storm/sql/runtime/serde/tsv/TsvScheme.java  |   54 +-
 .../sql/runtime/serde/tsv/TsvSerializer.java    |   41 +-
 .../trident/functions/EvaluationCalc.java       |   17 +-
 .../trident/functions/EvaluationFilter.java     |   10 +-
 .../trident/functions/EvaluationFunction.java   |   11 +-
 .../trident/functions/ForwardFunction.java      |    1 +
 .../storm/sql/runtime/utils/FieldInfoUtils.java |    4 +-
 .../storm/sql/runtime/utils/SerdeUtils.java     |   41 +-
 .../apache/storm/sql/runtime/utils/Utils.java   |   21 +-
 storm-buildtools/storm_checkstyle.xml           |    6 +
 storm-client-misc/pom.xml                       |    2 +-
 .../src/jvm/org/apache/storm/Config.java        |   59 +-
 .../jvm/org/apache/storm/StormSubmitter.java    |   63 +-
 .../storm/cluster/StormClusterStateImpl.java    |   21 +-
 .../coordination/BatchSubtopologyBuilder.java   |   19 +-
 .../org/apache/storm/daemon/StormCommon.java    |    6 +-
 .../daemon/metrics/ErrorReportingMetrics.java   |   38 +
 .../storm/daemon/supervisor/AdvancedFSOps.java  |   51 +-
 .../supervisor/ClientSupervisorUtils.java       |    8 +-
 .../storm/daemon/supervisor/IAdvancedFSOps.java |   10 +-
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |   26 +-
 .../jvm/org/apache/storm/executor/Executor.java |    9 +
 .../storm/executor/bolt/BoltExecutor.java       |    2 +
 .../executor/bolt/BoltOutputCollectorImpl.java  |    1 +
 .../storm/executor/spout/SpoutExecutor.java     |    1 +
 .../spout/SpoutOutputCollectorImpl.java         |    1 +
 .../org/apache/storm/generated/Assignment.java  |  519 +++--
 .../org/apache/storm/generated/BoltStats.java   |  440 ++--
 .../apache/storm/generated/ClusterSummary.java  |  108 +-
 .../storm/generated/ClusterWorkerHeartbeat.java |   52 +-
 .../storm/generated/CommonAggregateStats.java   |   44 +-
 .../storm/generated/ComponentPageInfo.java      |  264 +--
 .../org/apache/storm/generated/Credentials.java |   44 +-
 .../apache/storm/generated/ExecutorStats.java   |  168 +-
 .../jvm/org/apache/storm/generated/HBNodes.java |   32 +-
 .../org/apache/storm/generated/HBRecords.java   |   36 +-
 .../storm/generated/LSApprovedWorkers.java      |   44 +-
 .../generated/LSSupervisorAssignments.java      |   48 +-
 .../apache/storm/generated/LSTopoHistory.java   |   64 +-
 .../storm/generated/LSTopoHistoryList.java      |   36 +-
 .../storm/generated/LSWorkerHeartbeat.java      |   36 +-
 .../apache/storm/generated/ListBlobsResult.java |   32 +-
 .../apache/storm/generated/LocalAssignment.java |  253 ++-
 .../apache/storm/generated/LocalStateData.java  |   48 +-
 .../org/apache/storm/generated/LogConfig.java   |   48 +-
 .../jvm/org/apache/storm/generated/Nimbus.java  | 2059 ++++++++++++-----
 .../org/apache/storm/generated/NodeInfo.java    |   32 +-
 .../storm/generated/OwnerResourceSummary.java   | 2097 ++++++++++++++++++
 .../storm/generated/RebalanceOptions.java       |   44 +-
 .../storm/generated/SettableBlobMeta.java       |   36 +-
 .../apache/storm/generated/SharedMemory.java    |  711 ++++++
 .../org/apache/storm/generated/SpoutStats.java  |  252 +--
 .../org/apache/storm/generated/StormBase.java   |  312 ++-
 .../apache/storm/generated/StormTopology.java   |  531 ++++-
 .../apache/storm/generated/SupervisorInfo.java  |  152 +-
 .../storm/generated/SupervisorPageInfo.java     |   72 +-
 .../storm/generated/SupervisorSummary.java      |   44 +-
 .../storm/generated/TopologyHistoryInfo.java    |   32 +-
 .../apache/storm/generated/TopologyInfo.java    |  160 +-
 .../storm/generated/TopologyPageInfo.java       | 1062 ++++++++-
 .../apache/storm/generated/TopologyStats.java   |  220 +-
 .../apache/storm/generated/TopologySummary.java |  146 +-
 .../apache/storm/generated/WorkerResources.java |  206 +-
 .../apache/storm/generated/WorkerSummary.java   |   44 +-
 .../storm/metric/cgroup/CGroupMemoryLimit.java  |   16 +
 .../apache/storm/pacemaker/PacemakerClient.java |   28 +-
 .../storm/pacemaker/codec/ThriftDecoder.java    |   31 +-
 .../pacemaker/codec/ThriftNettyClientCodec.java |    7 +-
 .../jvm/org/apache/storm/scheduler/Cluster.java |  837 -------
 .../jvm/org/apache/storm/scheduler/INimbus.java |   49 -
 .../org/apache/storm/scheduler/IScheduler.java  |   40 -
 .../org/apache/storm/scheduler/ISupervisor.java |   45 -
 .../storm/scheduler/SchedulerAssignment.java    |   29 +-
 .../scheduler/SchedulerAssignmentImpl.java      |  141 +-
 .../storm/scheduler/SupervisorDetails.java      |    4 +-
 .../org/apache/storm/scheduler/Topologies.java  |   87 -
 .../apache/storm/scheduler/TopologyDetails.java |  517 -----
 .../org/apache/storm/scheduler/WorkerSlot.java  |   43 +-
 .../storm/scheduler/resource/Component.java     |   54 -
 .../storm/scheduler/resource/RAS_Node.java      |  529 -----
 .../storm/scheduler/resource/RAS_Nodes.java     |  138 --
 .../storm/scheduler/resource/ResourceUtils.java |  207 --
 .../scheduler/resource/SchedulingResult.java    |  116 -
 .../scheduler/resource/SchedulingState.java     |   56 -
 .../scheduler/resource/SchedulingStatus.java    |   40 -
 .../apache/storm/scheduler/resource/User.java   |  350 ---
 .../DefaultResourceAwareStrategy.java           |  757 -------
 .../strategies/scheduling/IStrategy.java        |   47 -
 .../storm/security/INimbusCredentialPlugin.java |   25 +-
 .../org/apache/storm/security/auth/AutoSSL.java |    8 +-
 .../security/auth/DefaultPrincipalToLocal.java  |    5 +-
 .../security/auth/ICredentialsRenewer.java      |   20 +-
 .../storm/security/auth/IPrincipalToLocal.java  |    6 +-
 .../security/auth/KerberosPrincipalToLocal.java |    5 +-
 .../auth/authorizer/SimpleACLAuthorizer.java    |   65 +-
 .../storm/security/auth/kerberos/AutoTGT.java   |    2 +-
 .../serialization/SerializationFactory.java     |   63 +-
 .../apache/storm/state/DefaultStateEncoder.java |    5 +-
 .../storm/state/DefaultStateSerializer.java     |   64 +-
 .../topology/BaseConfigurationDeclarer.java     |    9 +-
 .../ComponentConfigurationDeclarer.java         |   36 +
 .../storm/topology/IStatefulWindowedBolt.java   |   22 +-
 .../PersistentWindowedBoltExecutor.java         |  256 +++
 .../apache/storm/topology/ResourceDeclarer.java |   30 +
 .../storm/topology/SharedOffHeapWithinNode.java |   38 +
 .../topology/SharedOffHeapWithinWorker.java     |   36 +
 .../org/apache/storm/topology/SharedOnHeap.java |   38 +
 .../storm/topology/StatefulBoltExecutor.java    |    7 +-
 .../apache/storm/topology/TopologyBuilder.java  |   54 +-
 .../storm/topology/WindowedBoltExecutor.java    |   86 +-
 .../topology/base/BaseStatefulWindowedBolt.java |   39 +
 .../TransactionalTopologyBuilder.java           |   47 +-
 .../jvm/org/apache/storm/trident/Stream.java    |    7 +
 .../org/apache/storm/trident/TridentState.java  |    7 +
 .../apache/storm/trident/TridentTopology.java   |    5 +
 .../org/apache/storm/trident/graph/Group.java   |   12 +
 .../operation/DefaultResourceDeclarer.java      |   24 +-
 .../trident/operation/ITridentResource.java     |    8 +
 .../topology/TridentTopologyBuilder.java        |   46 +-
 .../windowing/AbstractTridentWindowManager.java |    4 +-
 .../strategy/SlidingCountWindowStrategy.java    |    4 +-
 .../strategy/SlidingDurationWindowStrategy.java |    4 +-
 .../strategy/TumblingCountWindowStrategy.java   |    4 +-
 .../TumblingDurationWindowStrategy.java         |    4 +-
 .../windowing/strategy/WindowStrategy.java      |    4 +-
 .../org/apache/storm/utils/DisruptorQueue.java  |   14 +-
 .../apache/storm/utils/ThriftTopologyUtils.java |   40 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |    2 +-
 .../storm/windowing/CountEvictionPolicy.java    |   17 +-
 .../storm/windowing/CountTriggerPolicy.java     |   24 +-
 .../jvm/org/apache/storm/windowing/Event.java   |    2 +-
 .../apache/storm/windowing/EvictionPolicy.java  |   35 +-
 .../storm/windowing/StatefulWindowManager.java  |  164 ++
 .../storm/windowing/TimeEvictionPolicy.java     |   25 +-
 .../storm/windowing/TimeTriggerPolicy.java      |   16 +-
 .../apache/storm/windowing/TriggerPolicy.java   |   20 +-
 .../storm/windowing/TupleWindowIterImpl.java    |   80 +
 .../windowing/WatermarkCountEvictionPolicy.java |   61 +-
 .../windowing/WatermarkCountTriggerPolicy.java  |   26 +-
 .../windowing/WatermarkTimeEvictionPolicy.java  |    1 +
 .../windowing/WatermarkTimeTriggerPolicy.java   |   26 +-
 .../jvm/org/apache/storm/windowing/Window.java  |   31 +-
 .../windowing/WindowLifecycleListener.java      |   19 +-
 .../apache/storm/windowing/WindowManager.java   |   51 +-
 .../persistence/SimpleWindowPartitionCache.java |  203 ++
 .../persistence/WindowPartitionCache.java       |  142 ++
 .../windowing/persistence/WindowState.java      |  424 ++++
 storm-client/src/py/storm/Nimbus-remote         |    7 +
 storm-client/src/py/storm/Nimbus.py             |  229 +-
 storm-client/src/py/storm/ttypes.py             | 1931 ++++++++++------
 storm-client/src/storm.thrift                   |   50 +
 .../org/apache/storm/scheduler/ClusterTest.java |  111 -
 .../storm/security/auth/AuthUtilsTestMock.java  |    6 +-
 .../authorizer/SimpleACLAuthorizerTest.java     |  388 ++++
 .../storm/state/DefaultStateSerializerTest.java |    3 +-
 .../PersistentWindowedBoltExecutorTest.java     |  299 +++
 .../SimpleWindowPartitionCacheTest.java         |  233 ++
 .../storm/windowing/WindowManagerTest.java      |   46 +-
 .../windowing/persistence/WindowStateTest.java  |  246 ++
 .../clj/org/apache/storm/daemon/logviewer.clj   | 1254 -----------
 storm-core/src/clj/org/apache/storm/ui/core.clj |  147 +-
 .../dev/logviewer-search-context-tests.log.gz   |  Bin 72 -> 0 bytes
 .../dev/logviewer-search-context-tests.log.test |    1 -
 storm-core/src/dev/small-worker.log.test        |    1 -
 storm-core/src/dev/test-3072.log.test           |    3 -
 storm-core/src/dev/test-worker.log.test         |  380 ----
 .../daemon/ClientJarTransformerRunner.java      |   49 -
 .../apache/storm/daemon/DirectoryCleaner.java   |  174 --
 .../org/apache/storm/daemon/JarTransformer.java |   31 -
 storm-core/src/ui/public/component.html         |   22 +-
 storm-core/src/ui/public/css/style.css          |   25 +
 .../src/ui/public/deep_search_result.html       |   24 +-
 storm-core/src/ui/public/index.html             |   18 +
 storm-core/src/ui/public/js/script.js           |   74 +
 storm-core/src/ui/public/logviewer_search.html  |    8 +-
 storm-core/src/ui/public/owner.html             |  211 ++
 storm-core/src/ui/public/search_result.html     |   10 +-
 storm-core/src/ui/public/supervisor.html        |    6 +
 .../deep-search-result-page-template.html       |    8 +-
 .../public/templates/index-page-template.html   |   73 +-
 .../public/templates/owner-page-template.html   |  239 ++
 .../templates/topology-page-template.html       |  108 +-
 storm-core/src/ui/public/topology.html          |    7 +
 .../apache/storm/trident/integration_test.clj   |    2 +-
 .../clj/org/apache/storm/logviewer_test.clj     |  824 -------
 .../test/clj/org/apache/storm/nimbus_test.clj   |  161 +-
 .../scheduler/multitenant_scheduler_test.clj    |  160 +-
 .../clj/org/apache/storm/scheduler_test.clj     |   29 +-
 .../test/jvm/org/apache/storm/MockAutoCred.java |    4 +-
 .../final-package/src/main/assembly/binary.xml  |    6 +-
 .../src/main/resources/resources/storm.py       |    2 +-
 storm-rename-hack/pom.xml                       |  131 --
 .../org/apache/storm/hack/DefaultShader.java    |  391 ----
 .../main/java/org/apache/storm/hack/IOUtil.java |   41 -
 .../org/apache/storm/hack/ShadeRequest.java     |   69 -
 .../apache/storm/hack/StormShadeRequest.java    |   41 -
 .../storm/hack/StormShadeTransformer.java       |   37 -
 .../apache/storm/hack/relocation/Relocator.java |   40 -
 .../storm/hack/relocation/SimpleRelocator.java  |   97 -
 .../storm/hack/resource/ClojureTransformer.java |   71 -
 .../hack/resource/ResourceTransformer.java      |   46 -
 .../java/org/apache/storm/DaemonConfig.java     |  102 +-
 .../java/org/apache/storm/LocalCluster.java     |   39 +-
 .../apache/storm/blobstore/BlobStoreUtils.java  |    4 +-
 .../container/ResourceIsolationInterface.java   |   51 +-
 .../storm/container/cgroup/CgroupManager.java   |  161 +-
 .../reporters/JmxPreparableReporter.java        |    1 -
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  381 +++-
 .../storm/daemon/nimbus/TopologyResources.java  |  269 ++-
 .../storm/daemon/supervisor/BasicContainer.java |  203 +-
 .../storm/daemon/supervisor/Container.java      |  151 +-
 .../daemon/supervisor/ReadClusterState.java     |   20 +-
 .../apache/storm/daemon/supervisor/Slot.java    |   73 +-
 .../storm/daemon/supervisor/Supervisor.java     |   16 +-
 .../daemon/supervisor/SupervisorUtils.java      |    7 +-
 .../daemon/supervisor/timer/UpdateBlobs.java    |   13 +-
 .../apache/storm/localizer/AsyncLocalizer.java  |   43 +-
 .../logging/filters/AccessLoggingFilter.java    |   36 +-
 .../apache/storm/pacemaker/PacemakerServer.java |    6 +-
 .../pacemaker/codec/ThriftNettyServerCodec.java |    8 +-
 .../org/apache/storm/scheduler/Cluster.java     | 1050 +++++++++
 .../org/apache/storm/scheduler/Component.java   |   88 +
 .../storm/scheduler/DefaultScheduler.java       |   13 +-
 .../apache/storm/scheduler/EvenScheduler.java   |   30 +-
 .../org/apache/storm/scheduler/INimbus.java     |   47 +
 .../org/apache/storm/scheduler/IScheduler.java  |   47 +
 .../storm/scheduler/ISchedulingState.java       |  275 +++
 .../org/apache/storm/scheduler/ISupervisor.java |   50 +
 .../storm/scheduler/IsolationScheduler.java     |    5 +
 .../storm/scheduler/SingleTopologyCluster.java  |   46 +
 .../org/apache/storm/scheduler/Topologies.java  |  139 ++
 .../apache/storm/scheduler/TopologyDetails.java |  573 +++++
 .../multitenant/MultitenantScheduler.java       |    7 +-
 .../storm/scheduler/resource/RAS_Node.java      |  465 ++++
 .../storm/scheduler/resource/RAS_Nodes.java     |  152 ++
 .../resource/ResourceAwareScheduler.java        |  456 ++--
 .../storm/scheduler/resource/ResourceUtils.java |  175 ++
 .../scheduler/resource/SchedulingResult.java    |   88 +
 .../scheduler/resource/SchedulingStatus.java    |   41 +
 .../apache/storm/scheduler/resource/User.java   |  228 ++
 .../eviction/DefaultEvictionStrategy.java       |   68 +-
 .../strategies/eviction/IEvictionStrategy.java  |   17 +-
 .../DefaultSchedulingPriorityStrategy.java      |   47 +-
 .../priority/ISchedulingPriorityStrategy.java   |   13 +-
 .../DefaultResourceAwareStrategy.java           |  741 +++++++
 .../strategies/scheduling/IStrategy.java        |   48 +
 .../apache/storm/utils/ServerConfigUtils.java   |    3 +-
 .../org/apache/storm/utils/ServerUtils.java     |    2 +-
 .../java/org/apache/storm/LocalStateTest.java   |    2 +-
 .../java/org/apache/storm/MessagingTest.java    |    3 -
 .../test/java/org/apache/storm/TestCgroups.java |    5 +-
 .../apache/storm/blobstore/BlobStoreTest.java   |    2 +-
 .../apache/storm/daemon/nimbus/NimbusTest.java  |   56 +
 .../daemon/supervisor/BasicContainerTest.java   |    2 +
 .../storm/daemon/supervisor/ContainerTest.java  |    6 +-
 .../storm/localizer/AsyncLocalizerTest.java     |    8 +-
 .../org/apache/storm/scheduler/ClusterTest.java |  111 +
 .../resource/TestResourceAwareScheduler.java    |  923 ++------
 .../storm/scheduler/resource/TestUser.java      |   95 +-
 .../TestUtilsForResourceAwareScheduler.java     |  159 +-
 .../eviction/TestDefaultEvictionStrategy.java   |  793 ++-----
 .../TestDefaultResourceAwareStrategy.java       |  214 +-
 storm-webapp/pom.xml                            |   51 +-
 .../common/AuthorizationExceptionMapper.java    |   40 +
 .../daemon/common/JsonResponseBuilder.java      |   74 +
 .../webapp/AuthorizationExceptionMapper.java    |   40 -
 .../daemon/drpc/webapp/DRPCApplication.java     |    1 +
 .../daemon/logviewer/LogviewerConstant.java     |   29 +
 .../storm/daemon/logviewer/LogviewerServer.java |  172 ++
 .../handler/LogviewerLogDownloadHandler.java    |   71 +
 .../handler/LogviewerLogPageHandler.java        |  470 ++++
 .../handler/LogviewerLogSearchHandler.java      |  753 +++++++
 .../handler/LogviewerProfileHandler.java        |  140 ++
 .../logviewer/utils/DirectoryCleaner.java       |  196 ++
 .../daemon/logviewer/utils/LogCleaner.java      |  267 +++
 .../logviewer/utils/LogFileDownloader.java      |   67 +
 .../utils/LogviewerResponseBuilder.java         |  157 ++
 .../logviewer/utils/ResourceAuthorizer.java     |  158 ++
 .../daemon/logviewer/utils/WorkerLogs.java      |  219 ++
 .../logviewer/webapp/LogviewerApplication.java  |  102 +
 .../logviewer/webapp/LogviewerResource.java     |  269 +++
 .../daemon/utils/ListFunctionalSupport.java     |  108 +
 .../apache/storm/daemon/utils/StreamUtil.java   |   47 +
 .../apache/storm/daemon/utils/UrlBuilder.java   |   53 +
 .../storm/daemon/drpc/DRPCServerTest.java       |   13 +-
 .../handler/LogviewerLogPageHandlerTest.java    |  105 +
 .../handler/LogviewerLogSearchHandlerTest.java  |  856 +++++++
 .../testsupport/ArgumentsVerifier.java          |   43 +
 .../testsupport/MockDirectoryBuilder.java       |   72 +
 .../logviewer/testsupport/MockFileBuilder.java  |   72 +
 .../daemon/logviewer/utils/LogCleanerTest.java  |  350 +++
 .../logviewer/utils/ResourceAuthorizerTest.java |  182 ++
 .../daemon/logviewer/utils/WorkerLogsTest.java  |   75 +
 .../logviewer-search-context-tests.log.gz       |  Bin 0 -> 72 bytes
 .../logviewer-search-context-tests.log.test     |    1 +
 .../src/test/resources/small-worker.log.test    |    1 +
 .../src/test/resources/test-3072.log.test       |    3 +
 .../src/test/resources/test-worker.log.test     |  380 ++++
 510 files changed, 33466 insertions(+), 18609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/832395a1/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/832395a1/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --cc 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 6ceb0cd,83231cd..19838f4
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@@ -23,9 -31,8 +31,9 @@@ import org.apache.storm.scheduler.Topol
  import 
org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
  import 
org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
  import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
- import org.apache.storm.utils.Utils;
  import org.apache.storm.utils.ReflectionUtils;
 +import org.apache.storm.utils.DisallowedStrategyException;
+ import org.apache.storm.utils.Utils;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -99,253 -80,94 +81,101 @@@ public class ResourceAwareScheduler imp
              if (td == null) {
                  break;
              }
-             scheduleTopology(td);
- 
-             LOG.debug("Nodes after scheduling:\n{}", 
this.schedulingState.nodes);
+             User submitter = userMap.get(td.getTopologySubmitter());
+             if (cluster.needsSchedulingRas(td)) {
+                 scheduleTopology(td, cluster, submitter, userMap);
+             } else {
+                 LOG.warn("Topology {} is already fully scheduled!", 
td.getName());
+                 cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
+             }
          }
- 
-         //update changes to cluster
-         updateChanges(cluster, topologies);
-     }
- 
-     private void updateChanges(Cluster cluster, Topologies topologies) {
-         //Cannot simply set this.cluster=schedulingState.cluster since 
clojure is immutable
-         cluster.setAssignments(schedulingState.cluster.getAssignments());
-         
cluster.setBlacklistedHosts(schedulingState.cluster.getBlacklistedHosts());
-         cluster.setStatusMap(schedulingState.cluster.getStatusMap());
-         
cluster.setSupervisorsResourcesMap(schedulingState.cluster.getSupervisorsResourcesMap());
-         
cluster.setTopologyResourcesMap(schedulingState.cluster.getTopologyResourcesMap());
-         
cluster.setWorkerResourcesMap(schedulingState.cluster.getWorkerResourcesMap());
-         //updating resources used by supervisor
-         updateSupervisorsResources(cluster, topologies);
-     }
- 
-     private void handleSchedulingError(TopologyDetails td, SchedulingState 
schedulingState, Exception e) {
-         LOG.error("failed to create instance of IStrategy: {} with error: {}! 
Topology {} will not be scheduled.",
-                   td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), 
e.getMessage(), td.getName());
-         User topologySubmitter = cleanup(schedulingState, td);
-         topologySubmitter.moveTopoFromPendingToInvalid(td);
      }
  
-     public void scheduleTopology(TopologyDetails td) {
-         User topologySubmitter = 
this.schedulingState.userMap.get(td.getTopologySubmitter());
-         if (this.schedulingState.cluster.getUnassignedExecutors(td).size() > 
0) {
-             LOG.debug("/********Scheduling topology {} from User 
{}************/", td.getName(), topologySubmitter);
 +
-             SchedulingState schedulingState = checkpointSchedulingState();
-             IStrategy rasStrategy = null;
+     public void scheduleTopology(TopologyDetails td, Cluster cluster, final 
User topologySubmitter,
+                                  Map<String, User> userMap) {
+         //A copy of cluster that we can modify, but does not get committed 
back to cluster unless scheduling succeeds
+         Cluster workingState = new Cluster(cluster);
+         IStrategy rasStrategy = null;
+         String strategyConf = (String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
+         try {
 -            rasStrategy = (IStrategy) 
ReflectionUtils.newInstance(strategyConf);
++            rasStrategy = (IStrategy) 
ReflectionUtils.newSchedulerStrategyInstance((String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
+             rasStrategy.prepare(conf);
++        } catch (DisallowedStrategyException e) {
++            topologySubmitter.markTopoUnsuccess(td);
++            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + 
e.getAttemptedClass()
++                              + " is not an allowed strategy. Please make 
sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY
++                              + " config is one of the allowed strategies: " 
+ e.getAllowedStrategies().toString());
++            return;
+         } catch (RuntimeException e) {
+             LOG.error("failed to create instance of IStrategy: {} Topology {} 
will not be scheduled.",
+                     strategyConf, td.getName(), e);
+             topologySubmitter.markTopoUnsuccess(td);
+             cluster.setStatus(td.getId(), "Unsuccessful in scheduling - 
failed to create instance of topology strategy "
+                     + strategyConf + ". Please check logs for details");
+             return;
+         }
+        
+         while (true) {
+             // A copy of the cluster that restricts the strategy to only 
modify a single topology
+             SingleTopologyCluster toSchedule = new 
SingleTopologyCluster(workingState, td.getId());
+             SchedulingResult result = null;
              try {
-                 rasStrategy = (IStrategy) 
ReflectionUtils.newSchedulerStrategyInstance((String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
-             } catch (DisallowedStrategyException e) {
-                 handleSchedulingError(td, schedulingState, e);
-                 this.schedulingState.cluster.setStatus(td.getId(), 
"Unsuccessful in scheduling - " + e.getAttemptedClass()
-                                                        + " is not an allowed 
strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY
-                                                        + " config is one of 
the allowed strategies: " + e.getAllowedStrategies().toString());
-                 return;
-             } catch (RuntimeException e) {
-                 handleSchedulingError(td, schedulingState, e);
-                 this.schedulingState.cluster.setStatus(td.getId(), 
"Unsuccessful in scheduling - failed to create instance of topology strategy "
-                                                        + 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for 
details");
-                 return;
+                 result = rasStrategy.schedule(toSchedule, td);
+             } catch (Exception ex) {
+                 LOG.error("Exception thrown when running strategy {} to 
schedule topology {}."
+                         + " Topology will not be scheduled!", 
rasStrategy.getClass().getName(), td.getName(), ex);
+                 topologySubmitter.markTopoUnsuccess(td);
+                 cluster.setStatus(td.getId(), "Unsuccessful in scheduling - 
Exception thrown when running strategy {}"
+                         + rasStrategy.getClass().getName() + ". Please check 
logs for details");
              }
-             IEvictionStrategy evictionStrategy = null;
-             while (true) {
-                 SchedulingResult result = null;
-                 try {
-                     // Need to re prepare scheduling strategy with cluster 
and topologies in case scheduling state was restored
-                     // Pass in a copy of scheduling state since the 
scheduling strategy should not be able to be able to make modifications to
-                     // the state of cluster directly
-                     rasStrategy.prepare(new 
SchedulingState(this.schedulingState));
-                     result = rasStrategy.schedule(td);
-                 } catch (Exception ex) {
-                     LOG.error(String.format("Exception thrown when running 
strategy %s to schedule topology %s. Topology will not be scheduled!"
-                             , rasStrategy.getClass().getName(), 
td.getName()), ex);
-                     topologySubmitter = cleanup(schedulingState, td);
-                     topologySubmitter.moveTopoFromPendingToInvalid(td);
-                     this.schedulingState.cluster.setStatus(td.getId(), 
"Unsuccessful in scheduling - Exception thrown when running strategy {}"
-                             + rasStrategy.getClass().getName() + ". Please 
check logs for details");
-                 }
-                 LOG.debug("scheduling result: {}", result);
-                 if (result != null && result.isValid()) {
-                     if (result.isSuccess()) {
+             LOG.debug("scheduling result: {}", result);
+             if (result != null) {
+                 if (result.isSuccess()) {
+                     try {
+                         cluster.updateFrom(toSchedule);
+                         cluster.setStatus(td.getId(), "Running - " + 
result.getMessage());
+                     } catch (Exception ex) {
+                         LOG.error("Unsuccessful attempting to assign 
executors to nodes.", ex);
+                         topologySubmitter.markTopoUnsuccess(td);
+                         cluster.setStatus(td.getId(), "Unsuccessful in 
scheduling - "
+                             + "IllegalStateException thrown when attempting 
to assign executors to nodes. Please check"
+                             + " log for details.");
+                     }
+                     return;
+                 } else {
+                     if (result.getStatus() == 
SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
+                         boolean madeSpace = false;
                          try {
-                             if (mkAssignment(td, 
result.getSchedulingResultMap())) {
-                                 
topologySubmitter.moveTopoFromPendingToRunning(td);
-                                 
this.schedulingState.cluster.setStatus(td.getId(), "Running - " + 
result.getMessage());
-                             } else {
-                                 topologySubmitter = 
this.cleanup(schedulingState, td);
-                                 
topologySubmitter.moveTopoFromPendingToAttempted(td);
-                                 
this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling 
- Unable to assign executors to nodes. Please check logs for details");
-                             }
-                         } catch (IllegalStateException ex) {
-                             LOG.error("Unsuccessful in scheduling - 
IllegalStateException thrown when attempting to assign executors to nodes.", 
ex);
-                             topologySubmitter = cleanup(schedulingState, td);
-                             
topologySubmitter.moveTopoFromPendingToAttempted(td);
-                             
this.schedulingState.cluster.setStatus(td.getId(), "Unsuccessful in scheduling 
- IllegalStateException thrown when attempting to assign executors to nodes. 
Please check log for details.");
+                             //need to re prepare since scheduling state might 
have been restored
+                             madeSpace = evictionStrategy.makeSpaceForTopo(td, 
workingState, userMap);
+                         } catch (Exception ex) {
+                             LOG.error("Exception thrown when running eviction 
strategy {} to schedule topology {}."
+                                     + " No evictions will be done!", 
evictionStrategy.getClass().getName(),
+                                 td.getName(), ex);
+                             topologySubmitter.markTopoUnsuccess(td);
+                             return;
                          }
-                         break;
-                     } else {
-                         if (result.getStatus() == 
SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
-                             if (evictionStrategy == null) {
-                                 try {
-                                     evictionStrategy = (IEvictionStrategy) 
ReflectionUtils.newInstance((String) 
this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
-                                 } catch (RuntimeException e) {
-                                     LOG.error("failed to create instance of 
eviction strategy: {} with error: {}! No topology eviction will be done.",
-                                             
this.conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY), 
e.getMessage());
-                                     
topologySubmitter.moveTopoFromPendingToAttempted(td);
-                                     break;
-                                 }
-                             }
-                             boolean madeSpace = false;
-                             try {
-                                 //need to re prepare since scheduling state 
might have been restored
-                                 
evictionStrategy.prepare(this.schedulingState);
-                                 madeSpace = 
evictionStrategy.makeSpaceForTopo(td);
-                             } catch (Exception ex) {
-                                 LOG.error(String.format("Exception thrown 
when running eviction strategy %s to schedule topology %s. No evictions will be 
done! Error: %s"
-                                         , 
evictionStrategy.getClass().getName(), td.getName(), ex.getClass().getName()), 
ex);
-                                 topologySubmitter = cleanup(schedulingState, 
td);
-                                 
topologySubmitter.moveTopoFromPendingToAttempted(td);
-                                 break;
-                             }
-                             if (!madeSpace) {
-                                 LOG.debug("Could not make space for topo {} 
will move to attempted", td);
-                                 topologySubmitter = cleanup(schedulingState, 
td);
-                                 
topologySubmitter.moveTopoFromPendingToAttempted(td);
-                                 
this.schedulingState.cluster.setStatus(td.getId(), "Not enough resources to 
schedule - " + result.getErrorMessage());
-                                 break;
-                             }
-                             continue;
-                         } else if (result.getStatus() == 
SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
-                             topologySubmitter = cleanup(schedulingState, td);
-                             
topologySubmitter.moveTopoFromPendingToInvalid(td, 
this.schedulingState.cluster);
-                             break;
-                         } else {
-                             topologySubmitter = cleanup(schedulingState, td);
-                             
topologySubmitter.moveTopoFromPendingToAttempted(td, 
this.schedulingState.cluster);
-                             break;
+                         if (!madeSpace) {
+                             LOG.debug("Could not make space for topo {} will 
move to attempted", td);
+                             topologySubmitter.markTopoUnsuccess(td);
+                             cluster.setStatus(td.getId(), "Not enough 
resources to schedule - "
+                                 + result.getErrorMessage());
+                             return;
                          }
+                         continue;
+                     } else {
+                         topologySubmitter.markTopoUnsuccess(td, cluster);
+                         return;
                      }
-                 } else {
-                     LOG.warn("Scheduling results returned from topology {} is 
not vaild! Topology with be ignored.", td.getName());
-                     topologySubmitter = cleanup(schedulingState, td);
-                     topologySubmitter.moveTopoFromPendingToInvalid(td, 
this.schedulingState.cluster);
-                     break;
                  }
+             } else {
+                 LOG.warn("Scheduling results returned from topology {} is not 
vaild! Topology with be ignored.",
+                     td.getName());
+                 topologySubmitter.markTopoUnsuccess(td, cluster);
+                 return;
              }
-         } else {
-             LOG.warn("Topology {} is already fully scheduled!", td.getName());
-             topologySubmitter.moveTopoFromPendingToRunning(td);
-             if (this.schedulingState.cluster.getStatusMap().get(td.getId()) 
== null || 
this.schedulingState.cluster.getStatusMap().get(td.getId()).equals("")) {
-                 this.schedulingState.cluster.setStatus(td.getId(), "Fully 
Scheduled");
-             }
-         }
-     }
- 
-     private User cleanup(SchedulingState schedulingState, TopologyDetails td) 
{
-         restoreCheckpointSchedulingState(schedulingState);
-         //since state is restored need the update User topologySubmitter to 
the new User object in userMap
-         return this.schedulingState.userMap.get(td.getTopologySubmitter());
-     }
- 
-     private boolean mkAssignment(TopologyDetails td, Map<WorkerSlot, 
Collection<ExecutorDetails>> schedulerAssignmentMap) {
-         if (schedulerAssignmentMap != null) {
-             double requestedMemOnHeap = td.getTotalRequestedMemOnHeap();
-             double requestedMemOffHeap = td.getTotalRequestedMemOffHeap();
-             double requestedCpu = td.getTotalRequestedCpu();
-             double assignedMemOnHeap = 0.0;
-             double assignedMemOffHeap = 0.0;
-             double assignedCpu = 0.0;
- 
-             Map<WorkerSlot, Double[]> workerResources = new 
HashMap<WorkerSlot, Double[]>();
- 
-             Set<String> nodesUsed = new HashSet<String>();
-             for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> 
workerToTasksEntry : schedulerAssignmentMap.entrySet()) {
-                 WorkerSlot targetSlot = workerToTasksEntry.getKey();
-                 Collection<ExecutorDetails> execsNeedScheduling = 
workerToTasksEntry.getValue();
-                 RAS_Node targetNode = 
this.schedulingState.nodes.getNodeById(targetSlot.getNodeId());
- 
-                 targetSlot = allocateResourceToSlot(td, execsNeedScheduling, 
targetSlot);
- 
-                 targetNode.assign(targetSlot, td, execsNeedScheduling);
- 
-                 LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} 
on Slot: {}",
-                         td.getName(), execsNeedScheduling, 
targetNode.getHostname(), targetSlot.getPort());
- 
-                 for (ExecutorDetails exec : execsNeedScheduling) {
-                     targetNode.consumeResourcesforTask(exec, td);
-                 }
-                 if (!nodesUsed.contains(targetNode.getId())) {
-                     nodesUsed.add(targetNode.getId());
-                 }
-                 assignedMemOnHeap += targetSlot.getAllocatedMemOnHeap();
-                 assignedMemOffHeap += targetSlot.getAllocatedMemOffHeap();
-                 assignedCpu += targetSlot.getAllocatedCpu();
- 
-                 Double[] worker_resources = {
-                     requestedMemOnHeap, requestedMemOffHeap, requestedCpu,
-                     targetSlot.getAllocatedMemOnHeap(), 
targetSlot.getAllocatedMemOffHeap(), targetSlot.getAllocatedCpu()};
-                 workerResources.put (targetSlot, worker_resources);
-             }
- 
-             Double[] resources = {requestedMemOnHeap, requestedMemOffHeap, 
requestedCpu,
-                     assignedMemOnHeap, assignedMemOffHeap, assignedCpu};
-             LOG.debug("setTopologyResources for {}: requested on-heap mem, 
off-heap mem, cpu: {} {} {} " +
-                             "assigned on-heap mem, off-heap mem, cpu: {} {} 
{}",
-                     td.getId(), requestedMemOnHeap, requestedMemOffHeap, 
requestedCpu,
-                     assignedMemOnHeap, assignedMemOffHeap, assignedCpu);
-             //updating resources used for a topology
-             this.schedulingState.cluster.setTopologyResources(td.getId(), 
resources);
-             this.schedulingState.cluster.setWorkerResources(td.getId(), 
workerResources);
-             return true;
-         } else {
-             LOG.warn("schedulerAssignmentMap for topo {} is null. This 
shouldn't happen!", td.getName());
-             return false;
-         }
-     }
- 
-     private WorkerSlot allocateResourceToSlot (TopologyDetails td, 
Collection<ExecutorDetails> executors, WorkerSlot slot) {
-         double onHeapMem = 0.0;
-         double offHeapMem = 0.0;
-         double cpu = 0.0;
-         for (ExecutorDetails exec : executors) {
-             Double onHeapMemForExec = td.getOnHeapMemoryRequirement(exec);
-             if (onHeapMemForExec != null) {
-                 onHeapMem += onHeapMemForExec;
-             }
-             Double offHeapMemForExec = td.getOffHeapMemoryRequirement(exec);
-             if (offHeapMemForExec != null) {
-                 offHeapMem += offHeapMemForExec;
-             }
-             Double cpuForExec = td.getTotalCpuReqTask(exec);
-             if (cpuForExec != null) {
-                 cpu += cpuForExec;
-             }
-         }
-         return new WorkerSlot(slot.getNodeId(), slot.getPort(), onHeapMem, 
offHeapMem, cpu);
-     }
- 
-     private void updateSupervisorsResources(Cluster cluster, Topologies 
topologies) {
-         Map<String, Double[]> supervisors_resources = new HashMap<String, 
Double[]>();
-         Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster, 
topologies);
-         for (Map.Entry<String, RAS_Node> entry : nodes.entrySet()) {
-             RAS_Node node = entry.getValue();
-             Double totalMem = node.getTotalMemoryResources();
-             Double totalCpu = node.getTotalCpuResources();
-             Double usedMem = totalMem - node.getAvailableMemoryResources();
-             Double usedCpu = totalCpu - node.getAvailableCpuResources();
-             Double[] resources = {totalMem, totalCpu, usedMem, usedCpu};
-             supervisors_resources.put(entry.getKey(), resources);
          }
-         cluster.setSupervisorsResourcesMap(supervisors_resources);
-     }
- 
-     public User getUser(String user) {
-         return this.schedulingState.userMap.get(user);
-     }
- 
-     public Map<String, User> getUserMap() {
-         return this.schedulingState.userMap;
      }
  
      /**

Reply via email to