Merge branch '1.x-branch' into metrics_v2
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/57a50f36 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/57a50f36 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/57a50f36 Branch: refs/heads/1.x-branch Commit: 57a50f36df058937587cc08d81c235efc8dc720a Parents: b8de0f3 c4a09d3 Author: P. Taylor Goetz <[email protected]> Authored: Wed Aug 30 17:07:02 2017 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Wed Aug 30 17:07:02 2017 -0400 ---------------------------------------------------------------------- .travis.yml | 5 +- CHANGELOG.md | 1480 ------------------ DEVELOPER.md | 11 +- bin/storm.cmd | 7 + bin/storm.ps1 | 68 + bin/storm.py | 9 +- conf/storm-env.ps1 | 23 + dev-tools/jira-github-join.py | 2 +- dev-tools/jira/__init__.py | 285 ---- dev-tools/jira_github/__init__.py | 285 ++++ dev-tools/release_notes.py | 118 ++ dev-tools/report/report.py | 2 +- docs/Metrics.md | 201 ++- docs/SECURITY.md | 40 +- docs/cgroups_in_storm.md | 71 - docs/distcache-blobstore.md | 8 +- docs/index.md | 1 - docs/storm-kafka-client.md | 147 +- examples/storm-elasticsearch-examples/pom.xml | 10 + .../elasticsearch/bolt/EsIndexTopology.java | 6 +- .../storm/elasticsearch/common/EsTestUtil.java | 14 +- .../trident/TridentEsTopology.java | 3 +- .../TridentKafkaClientWordCountNamedTopics.java | 27 +- ...identKafkaClientWordCountWildcardTopics.java | 5 +- .../java/org/apache/storm/flux/TCKTest.java | 9 + .../resources/configs/substitution-test.yaml | 3 +- .../src/test/resources/configs/test.properties | 1 + .../backends/trident/TestPlanCompiler.java | 2 +- .../apache/storm/common/AbstractAutoCreds.java | 27 +- .../apache/storm/hbase/security/AutoHBase.java | 39 +- .../apache/storm/hdfs/security/AutoHDFS.java | 41 +- .../apache/storm/hive/security/AutoHive.java | 39 +- external/storm-elasticsearch/pom.xml | 44 +- .../DefaultEsLookupResultOutput.java | 62 + .../elasticsearch/ElasticsearchGetRequest.java | 36 - .../elasticsearch/EsLookupResultOutput.java | 5 +- .../elasticsearch/bolt/AbstractEsBolt.java | 53 +- .../storm/elasticsearch/bolt/EsIndexBolt.java | 31 +- .../storm/elasticsearch/bolt/EsLookupBolt.java | 49 +- .../elasticsearch/bolt/EsPercolateBolt.java | 41 +- .../common/DefaultEsTupleMapper.java | 19 + .../storm/elasticsearch/common/EsConfig.java | 130 +- .../elasticsearch/common/EsTupleMapper.java | 9 + .../common/StormElasticSearchClient.java | 37 +- .../common/TransportAddresses.java | 72 - .../apache/storm/elasticsearch/doc/Index.java | 69 + .../storm/elasticsearch/doc/IndexDoc.java | 43 + .../storm/elasticsearch/doc/IndexItem.java | 91 ++ .../storm/elasticsearch/doc/IndexItemDoc.java | 42 + .../apache/storm/elasticsearch/doc/Shards.java | 63 + .../storm/elasticsearch/doc/SourceDoc.java | 43 + .../response/BulkIndexResponse.java | 80 + .../elasticsearch/response/LookupResponse.java | 63 + .../response/PercolateResponse.java | 85 + .../storm/elasticsearch/trident/EsState.java | 72 +- .../elasticsearch/trident/EsStateFactory.java | 15 +- .../bolt/AbstractEsBoltIntegrationTest.java | 68 +- .../elasticsearch/bolt/AbstractEsBoltTest.java | 15 +- .../elasticsearch/bolt/EsIndexBoltTest.java | 44 +- .../bolt/EsLookupBoltIntegrationTest.java | 75 +- .../elasticsearch/bolt/EsLookupBoltTest.java | 67 +- .../elasticsearch/bolt/EsPercolateBoltTest.java | 62 +- .../elasticsearch/common/EsConfigTest.java | 60 +- .../storm/elasticsearch/common/EsTestUtil.java | 101 +- .../common/TransportAddressesTest.java | 81 - .../trident/EsStateFactoryTest.java | 2 +- .../elasticsearch/trident/EsStateTest.java | 98 ++ .../src/test/resources/log4j2.xml | 33 + .../storm/hdfs/bolt/AbstractHdfsBolt.java | 23 +- .../java/org/apache/storm/hdfs/bolt/Writer.java | 35 + .../storm/hdfs/common/AbstractHDFSWriter.java | 16 +- .../org/apache/storm/jms/spout/JmsSpout.java | 291 +++- .../apache/storm/jms/spout/JmsSpoutTest.java | 81 +- .../kafka/spout/EmptyKafkaTupleListener.java | 53 + .../apache/storm/kafka/spout/KafkaSpout.java | 15 +- .../storm/kafka/spout/KafkaSpoutConfig.java | 574 ++++--- .../storm/kafka/spout/KafkaTupleListener.java | 83 + .../spout/ManualPartitionNamedSubscription.java | 78 - .../ManualPartitionPatternSubscription.java | 76 - .../spout/ManualPartitionSubscription.java | 72 + .../storm/kafka/spout/ManualPartitioner.java | 4 +- .../storm/kafka/spout/NamedSubscription.java | 4 +- .../storm/kafka/spout/NamedTopicFilter.java | 68 + .../storm/kafka/spout/PatternSubscription.java | 4 +- .../storm/kafka/spout/PatternTopicFilter.java | 70 + .../kafka/spout/SerializableDeserializer.java | 6 +- .../apache/storm/kafka/spout/Subscription.java | 2 +- .../apache/storm/kafka/spout/TopicFilter.java | 38 + .../internal/KafkaConsumerFactoryDefault.java | 3 +- .../kafka/spout/internal/OffsetManager.java | 9 +- .../spout/trident/KafkaTridentSpoutManager.java | 3 +- .../storm/kafka/spout/KafkaSpoutCommitTest.java | 131 ++ .../storm/kafka/spout/KafkaSpoutConfigTest.java | 17 +- .../storm/kafka/spout/KafkaSpoutEmitTest.java | 53 +- .../kafka/spout/KafkaSpoutRebalanceTest.java | 37 +- .../kafka/spout/KafkaSpoutRetryLimitTest.java | 116 ++ .../kafka/spout/MaxUncommittedOffsetTest.java | 10 +- .../storm/kafka/spout/NamedTopicFilterTest.java | 70 + .../kafka/spout/PatternTopicFilterTest.java | 75 + .../kafka/spout/SingleTopicKafkaSpoutTest.java | 10 +- .../SpoutWithMockedConsumerSetupHelper.java | 87 + .../SingleTopicKafkaSpoutConfiguration.java | 48 +- .../test/KafkaSpoutTopologyMainNamedTopics.java | 4 +- .../KafkaSpoutTopologyMainWildcardTopics.java | 4 +- integration-test/config/install-zookeeper.sh | 2 +- integration-test/run-it.sh | 3 +- .../org/apache/storm/command/config_value.clj | 6 +- .../src/clj/org/apache/storm/converter.clj | 11 +- .../src/clj/org/apache/storm/daemon/common.clj | 4 +- .../clj/org/apache/storm/daemon/executor.clj | 14 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 71 +- .../storm/pacemaker/pacemaker_state_factory.clj | 15 +- storm-core/src/jvm/org/apache/storm/Config.java | 16 +- .../storm/cluster/StormClusterStateImpl.java | 16 +- .../storm/daemon/supervisor/AdvancedFSOps.java | 16 +- .../storm/daemon/supervisor/Container.java | 6 +- .../daemon/supervisor/ReadClusterState.java | 3 + .../apache/storm/daemon/supervisor/Slot.java | 11 + .../storm/daemon/supervisor/Supervisor.java | 16 +- .../daemon/supervisor/SupervisorUtils.java | 11 +- .../daemon/supervisor/timer/UpdateBlobs.java | 26 +- .../jvm/org/apache/storm/drpc/DRPCSpout.java | 6 - .../org/apache/storm/generated/Assignment.java | 114 +- .../apache/storm/generated/LocalAssignment.java | 114 +- .../org/apache/storm/generated/StormBase.java | 114 +- .../apache/storm/localizer/AsyncLocalizer.java | 37 +- .../jvm/org/apache/storm/scheduler/Cluster.java | 85 +- .../apache/storm/scheduler/TopologyDetails.java | 35 +- .../multitenant/MultitenantScheduler.java | 2 +- .../storm/security/INimbusCredentialPlugin.java | 24 +- .../security/auth/ICredentialsRenewer.java | 18 +- .../storm/security/auth/kerberos/AutoTGT.java | 6 +- .../jvm/org/apache/storm/utils/ConfigUtils.java | 31 +- .../org/apache/storm/utils/DisruptorQueue.java | 70 +- .../org/apache/storm/utils/ObjectReader.java | 58 + .../src/jvm/org/apache/storm/utils/Utils.java | 52 +- storm-core/src/py/storm/ttypes.py | 47 +- storm-core/src/storm.thrift | 5 + .../test/clj/org/apache/storm/cluster_test.clj | 8 +- .../scheduler/multitenant_scheduler_test.clj | 77 +- .../scheduler/resource_aware_scheduler_test.clj | 49 +- .../clj/org/apache/storm/scheduler_test.clj | 10 +- .../test/jvm/org/apache/storm/ConfigTest.java | 92 ++ .../storm/daemon/supervisor/ContainerTest.java | 6 +- .../storm/localizer/AsyncLocalizerTest.java | 8 +- .../org/apache/storm/scheduler/ClusterTest.java | 111 ++ .../resource/TestResourceAwareScheduler.java | 183 ++- .../storm/scheduler/resource/TestUser.java | 7 +- .../TestUtilsForResourceAwareScheduler.java | 34 +- .../eviction/TestDefaultEvictionStrategy.java | 118 +- .../TestDefaultResourceAwareStrategy.java | 8 +- .../org/apache/storm/utils/ConfigUtilsTest.java | 98 ++ .../jvm/org/apache/storm/utils/UtilsTest.java | 84 + storm-dist/binary/src/main/assembly/binary.xml | 10 +- .../src/main/resources/resources/storm.py | 2 +- 155 files changed, 5523 insertions(+), 3666 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/57a50f36/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj index 95e43f6,52063fc..250ace1 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@@ -233,10 -231,7 +233,10 @@@ (str "executor" executor-id "-send-queue") (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) + (.getStormId worker-context) + component-id + (.getThisWorkerPort worker-context) - :producer-type :single-threaded + :producer-type :multi-threaded :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) ] @@@ -820,9 -811,8 +822,9 @@@ (let [delta (tuple-time-delta! tuple)] (when debug? (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple)) + (.mark ^Meter (:acked-meter (:executor-data task-data))) (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) - (when delta + (when (<= 0 delta) (stats/bolt-acked-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) @@@ -836,9 -826,8 +838,9 @@@ debug? (= true (storm-conf TOPOLOGY-DEBUG))] (when debug? (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple)) + (.mark ^Meter (:failed-meter (:executor-data task-data))) (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) - (when delta + (when (<= 0 delta) (stats/bolt-failed-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) http://git-wip-us.apache.org/repos/asf/storm/blob/57a50f36/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/57a50f36/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 5c0a2fb,5fd4b84..ca8568c --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@@ -34,9 -34,6 +34,8 @@@ import com.lmax.disruptor.dsl.ProducerT import org.apache.storm.Config; import org.apache.storm.metric.api.IStatefulObject; import org.apache.storm.metric.internal.RateTracker; +import org.apache.storm.metrics2.DisruptorMetrics; +import org.apache.storm.metrics2.StormMetricRegistry; - import org.apache.storm.task.WorkerTopologyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -62,19 -59,18 +61,19 @@@ import java.util.concurrent.locks.Reent * the ability to catch up to the producer by processing tuples in batches. */ public class DisruptorQueue implements IStatefulObject { -- private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class); ++ private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class); private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); -- + private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true); ++ private static int getNumFlusherPoolThreads() { int numThreads = 100; try { -- Map<String, Object> conf = Utils.readStormConfig(); -- numThreads = Utils.getInt(conf.get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), numThreads); ++ Map<String, Object> conf = Utils.readStormConfig(); ++ numThreads = Utils.getInt(conf.get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), numThreads); } catch (Exception e) { -- LOG.warn("Error while trying to read system config", e); ++ LOG.warn("Error while trying to read system config", e); } try { String threads = System.getProperty("num_flusher_pool_threads", String.valueOf(numThreads)); @@@ -86,8 -82,8 +85,8 @@@ return numThreads; } -- private static class FlusherPool { -- private static final String THREAD_PREFIX = "disruptor-flush"; ++ private static class FlusherPool { ++ private static final String THREAD_PREFIX = "disruptor-flush"; private Timer _timer = new Timer(THREAD_PREFIX + "-trigger", true); private ThreadPoolExecutor _exec; private HashMap<Long, ArrayList<Flusher>> _pendingFlush = new HashMap<>(); @@@ -201,8 -197,8 +200,8 @@@ if (block) { _flushLock.lock(); } else if (!_flushLock.tryLock()) { -- //Someone else if flushing so don't do anything -- return; ++ //Someone else if flushing so don't do anything ++ return; } try { while (!_overflow.isEmpty()) { @@@ -256,7 -252,7 +255,7 @@@ } } -- if (!flushed) { ++ if (!flushed) { _overflow.add(_currentBatch); _currentBatch = new ArrayList<Object>(_inputBatchSize); } @@@ -276,8 -272,8 +275,8 @@@ if (block) { _flushLock.lock(); } else if (!_flushLock.tryLock()) { -- //Someone else if flushing so don't do anything -- return; ++ //Someone else if flushing so don't do anything ++ return; } try { while (!_overflow.isEmpty()) { @@@ -349,11 -345,9 +348,17 @@@ return (1.0F * population() / capacity()); } + public double arrivalRate(){ + return _rateTracker.reportRate(); + } + + public double sojournTime(){ ++ return tuplePopulation.get() / Math.max(arrivalRate(), 0.00001) * 1000.0; ++ } ++ + public Object getState() { + Map state = new HashMap<String, Object>(); + // get readPos then writePos so it's never an under-estimate long rp = readPos(); long wp = writePos(); @@@ -393,8 -393,7 +404,8 @@@ private final int _inputBatchSize; private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<Long, ThreadLocalInserter>(); private final Flusher _flusher; - private final QueueMetrics _metrics; // old metrics API + private final QueueMetrics _metrics; + private final DisruptorMetrics _disruptorMetrics; private String _queueName = ""; private DisruptorBackpressureCallback _cb = null; @@@ -402,9 -401,10 +413,10 @@@ private int _lowWaterMark = 0; private boolean _enableBackpressure = false; private final AtomicLong _overflowCount = new AtomicLong(0); + private final AtomicLong tuplePopulation = new AtomicLong(0); private volatile boolean _throttleOn = false; - public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) { + public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, int port) { this._queueName = PREFIX + queueName; WaitStrategy wait; if (readTimeout <= 0) { @@@ -425,13 -424,6 +437,12 @@@ _flusher = new Flusher(Math.max(flushInterval, 1), _queueName); _flusher.start(); - + METRICS_TIMER.schedule(new TimerTask(){ + @Override + public void run() { + _disruptorMetrics.set(_metrics); + } - }, 15000, 15000); // TODO: Configurable interval ++ }, 15000, 15000); } public String getName() { @@@ -619,4 -612,4 +631,4 @@@ public QueueMetrics getMetrics() { return _metrics; } --} ++}
