This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f728337  remove deprecated standalone realtime node (#7915)
f728337 is described below

commit f7283378acc7a47e44bfba5a4fe63d086a6f9e10
Author: Clint Wylie <[email protected]>
AuthorDate: Tue Jul 2 18:12:17 2019 -0700

    remove deprecated standalone realtime node (#7915)
    
    * remove CliRealtime, RealtimeManager, etc
    
    * add redirects for deleted page to page that explains the deleted thing
    
    * adjust docs
---
 docs/_redirects.json                               |    8 +-
 docs/content/configuration/index.md                |    7 +-
 docs/content/configuration/realtime.md             |   98 --
 docs/content/dependencies/zookeeper.md             |    4 +-
 docs/content/design/realtime.md                    |   80 --
 docs/content/development/overview.md               |    3 +-
 docs/content/ingestion/firehose.md                 |    6 +-
 docs/content/ingestion/ingestion-spec.md           |    2 -
 docs/content/ingestion/standalone-realtime.md      |   43 +
 docs/content/ingestion/stream-pull.md              |  376 -------
 .../druid/guice/FireDepartmentsProvider.java       |   59 --
 .../apache/druid/guice/RealtimeManagerConfig.java  |   37 -
 .../druid/segment/realtime/RealtimeManager.java    |  393 -------
 .../segment/realtime/RealtimeManagerTest.java      | 1104 --------------------
 .../firehose/CombiningFirehoseFactoryTest.java     |    3 +-
 .../java/org/apache/druid/cli/CliRealtime.java     |   73 --
 .../org/apache/druid/cli/CliRealtimeExample.java   |  131 ---
 .../src/main/java/org/apache/druid/cli/Main.java   |    6 -
 .../org/apache/druid/guice/RealtimeModule.java     |  133 ---
 .../test/java/org/apache/druid/cli/MainTest.java   |    2 -
 20 files changed, 56 insertions(+), 2512 deletions(-)

diff --git a/docs/_redirects.json b/docs/_redirects.json
index 18cdf69..508aedf 100644
--- a/docs/_redirects.json
+++ b/docs/_redirects.json
@@ -59,9 +59,9 @@
   {"source": "Post-aggregations.html", "target": 
"querying/post-aggregations.html"},
   {"source": "Query-Context.html", "target": "querying/query-context.html"},
   {"source": "Querying.html", "target": "querying/querying.html"},
-  {"source": "Realtime-Config.html", "target": "configuration/realtime.html"},
+  {"source": "Realtime-Config.html", "target": 
"ingestion/standalone-realtime.html"},
+  {"source": "Realtime.html", "target": "ingestion/standalone-realtime.html"},
   {"source": "Realtime-ingestion.html", "target": 
"ingestion/stream-ingestion.html"},
-  {"source": "Realtime.html", "target": "design/realtime.html"},
   {"source": "Recommendations.html", "target": 
"operations/recommendations.html"},
   {"source": "Rolling-Updates.html", "target": 
"operations/rolling-updates.html"},
   {"source": "Router.html", "target": "development/router.html"},
@@ -167,4 +167,8 @@
   {"source": "development/extensions-core/namespaced-lookup.html", "target": 
"lookups-cached-global.html"},
   {"source": "operations/performance-faq.html", "target": 
"../operations/basic-cluster-tuning.html"},
   {"source": "development/extensions-contrib/orc.html", "target": 
"../extensions-core/orc.html"}
+  {"source": "operations/performance-faq.html", "target": 
"../operations/basic-cluster-tuning.html"},
+  {"source": "configuration/realtime.md", "target": 
"../ingestion/standalone-realtime.html"},
+  {"source": "design/realtime.md", "target": 
"../ingestion/standalone-realtime.html"},
+  {"source": "ingestion/stream-pull.md", "target": 
"../ingestion/standalone-realtime.html"}
 ]
diff --git a/docs/content/configuration/index.md 
b/docs/content/configuration/index.md
index 0f70489..c73fb36 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -87,7 +87,6 @@ This page documents all of the configuration properties for 
each Druid service t
         * [Segment Discovery](#segment-discovery)
   * [Caching](#cache-configuration)
   * [General Query Configuration](#general-query-configuration)
-  * [Realtime processes (Deprecated)](#realtime-processes)
 
 ## Recommended Configuration File Organization
 
@@ -493,7 +492,7 @@ To use graphite as emitter set `druid.emitter=graphite`. 
For configuration detai
 
 ### Metadata Storage
 
-These properties specify the jdbc connection and other configuration around 
the metadata storage. The only processes that connect to the metadata storage 
with these properties are the [Coordinator](../design/coordinator.html), 
[Overlord](../design/overlord.html) and [Realtime 
Processes](../design/realtime.html).
+These properties specify the jdbc connection and other configuration around 
the metadata storage. The only processes that connect to the metadata storage 
with these properties are the [Coordinator](../design/coordinator.html) and 
[Overlord](../design/overlord.html).
 
 |Property|Description|Default|
 |--------|-----------|-------|
@@ -1672,7 +1671,3 @@ Supported query contexts:
 |`maxResults`|Can be used to lower the value of 
`druid.query.groupBy.maxResults` for this query.|None|
 |`useOffheap`|Set to true to store aggregations off-heap when merging 
results.|false|
 
-
-## Realtime processes
-
-Configuration for the deprecated realtime process can be found 
[here](../configuration/realtime.html).
diff --git a/docs/content/configuration/realtime.md 
b/docs/content/configuration/realtime.md
deleted file mode 100644
index dd319fe..0000000
--- a/docs/content/configuration/realtime.md
+++ /dev/null
@@ -1,98 +0,0 @@
----
-layout: doc_page
-title: "Realtime Process Configuration"
----
-
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-
-# Realtime Process Configuration
-
-For general Apache Druid (incubating) Realtime Process information, see 
[here](../design/realtime.html).
-
-Runtime Configuration
----------------------
-
-The realtime process uses several of the global configs in 
[Configuration](../configuration/index.html) and has the following set of 
configurations as well:
-
-### Process Config
-
-|Property|Description|Default|
-|--------|-----------|-------|
-|`druid.host`|The host for the current process. This is used to advertise the 
current processes location as reachable from another process and should 
generally be specified such that `http://${druid.host}/` could actually talk to 
this process|InetAddress.getLocalHost().getCanonicalHostName()|
-|`druid.plaintextPort`|This is the port to actually listen on; unless port 
mapping is used, this will be the same port as is on `druid.host`|8084|
-|`druid.tlsPort`|TLS port for HTTPS connector, if 
[druid.enableTlsPort](../operations/tls-support.html) is set then this config 
will be used. If `druid.host` contains port then that port will be ignored. 
This should be a non-negative Integer.|8284|
-|`druid.service`|The name of the service. This is used as a dimension when 
emitting metrics and alerts to differentiate between the various 
services|druid/realtime|
-
-### Realtime Operation
-
-|Property|Description|Default|
-|--------|-----------|-------|
-|`druid.publish.type`|Where to publish segments. Choices are "noop" or 
"metadata".|metadata|
-|`druid.realtime.specFile`|File location of realtime specFile.|none|
-
-### Storing Intermediate Segments
-
-|Property|Description|Default|
-|--------|-----------|-------|
-|`druid.segmentCache.locations`|Where intermediate segments are stored. The 
maxSize should always be zero.|none|
-
-
-### Query Configs
-
-#### Processing
-
-|Property|Description|Default|
-|--------|-----------|-------|
-|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the 
storage of intermediate results. The computation engine in both the Historical 
and Realtime processes will use a scratch buffer of this size to do all of 
their intermediate computations off-heap. Larger values allow for more 
aggregations in a single pass over the data while smaller values can require 
more passes depending on the query that is being executed.|auto (max 1GB)|
-|`druid.processing.formatString`|Realtime and Historical processes use this 
format string to name their processing threads.|processing-%s|
-|`druid.processing.numMergeBuffers`|The number of direct memory buffers 
available for merging query results. The buffers are sized by 
`druid.processing.buffer.sizeBytes`. This property is effectively a concurrency 
limit for queries that require merging buffers. If you are using any queries 
that require merge buffers (currently, just groupBy v2) then you should have at 
least two of these.|`max(2, druid.processing.numThreads / 4)`|
-|`druid.processing.numThreads`|The number of processing threads to have 
available for parallel processing of segments. Our rule of thumb is `num_cores 
- 1`, which means that even under heavy load there will still be one core 
available to do background tasks like talking with ZooKeeper and pulling down 
segments. If only one core is available, this property defaults to the value 
`1`.|Number of cores - 1 (or 1)|
-|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the 
dimension value lookup cache. Any value greater than `0` enables the cache. It 
is currently disabled by default. Enabling the lookup cache can significantly 
improve the performance of aggregators operating on dimension values, such as 
the JavaScript aggregator, or cardinality aggregator, but can slow things down 
if the cache hit rate is low (i.e. dimensions with few repeating values). 
Enabling it may also require add [...]
-|`druid.processing.tmpDir`|Path where temporary files created while processing 
a query should be stored. If specified, this configuration takes priority over 
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
-
-The amount of direct memory needed by Druid is at least
-`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + 
druid.processing.numThreads + 1)`. You can
-ensure at least this amount of direct memory is available by providing 
`-XX:MaxDirectMemorySize=<VALUE>` at the command
-line.
-
-#### General Query Configuration
-
-##### GroupBy Query Config
-
-See [groupBy server 
configuration](../querying/groupbyquery.html#server-configuration).
-
-##### Search Query Config
-
-|Property|Description|Default|
-|--------|-----------|-------|
-|`druid.query.search.maxSearchLimit`|Maximum number of search results to 
return.|1000|
-
-### Caching
-
-You can optionally configure caching to be enabled on the realtime process by 
setting caching configs here.
-
-|Property|Possible Values|Description|Default|
-|--------|---------------|-----------|-------|
-|`druid.realtime.cache.useCache`|true, false|Enable the cache on the 
realtime.|false|
-|`druid.realtime.cache.populateCache`|true, false|Populate the cache on the 
realtime.|false|
-|`druid.realtime.cache.unCacheable`|All druid query types|All query types to 
not cache.|`["select"]`|
-|`druid.realtime.cache.maxEntrySize`|positive integer or -1|Maximum size of an 
individual cache entry (processed results for one segment), in bytes, or -1 for 
unlimited.|`1000000` (1MB)|
-
-See [cache configuration](index.html#cache-configuration) for how to configure 
cache settings.
diff --git a/docs/content/dependencies/zookeeper.md 
b/docs/content/dependencies/zookeeper.md
index a41e815..5143f92 100644
--- a/docs/content/dependencies/zookeeper.md
+++ b/docs/content/dependencies/zookeeper.md
@@ -27,7 +27,7 @@ title: "ZooKeeper"
 Apache Druid (incubating) uses [Apache 
ZooKeeper](http://zookeeper.apache.org/) (ZK) for management of current cluster 
state. The operations that happen over ZK are
 
 1.  [Coordinator](../design/coordinator.html) leader election
-2.  Segment "publishing" protocol from [Historical](../design/historical.html) 
and [Realtime](../design/realtime.html)
+2.  Segment "publishing" protocol from [Historical](../design/historical.html)
 3.  Segment load/drop protocol between 
[Coordinator](../design/coordinator.html) and 
[Historical](../design/historical.html)
 4.  [Overlord](../design/overlord.html) leader election
 5.  [Overlord](../design/overlord.html) and 
[MiddleManager](../design/middlemanager.html) task management
@@ -44,7 +44,7 @@ ${druid.zk.paths.coordinatorPath}/_COORDINATOR
 
 The `announcementsPath` and `servedSegmentsPath` are used for this.
 
-All [Historical](../design/historical.html) and 
[Realtime](../design/realtime.html) processes publish themselves on the 
`announcementsPath`, specifically, they will create an ephemeral znode at
+All [Historical](../design/historical.html) processes publish themselves on 
the `announcementsPath`, specifically, they will create an ephemeral znode at
 
 ```
 ${druid.zk.paths.announcementsPath}/${druid.host}
diff --git a/docs/content/design/realtime.md b/docs/content/design/realtime.md
deleted file mode 100644
index df6b4e0..0000000
--- a/docs/content/design/realtime.md
+++ /dev/null
@@ -1,80 +0,0 @@
----
-layout: doc_page
-title: "Real-time Process"
----
-
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-
-# Real-time Process
-
-<div class="note info">
-NOTE: Realtime processes are deprecated. Please use the <a 
href="../development/extensions-core/kafka-ingestion.html">Kafka Indexing 
Service</a> for stream pull use cases instead. 
-</div>
-
-For Apache Druid (incubating) Real-time Process Configuration, see [Realtime 
Configuration](../configuration/realtime.html).
-
-For Real-time Ingestion, see [Realtime 
Ingestion](../ingestion/stream-ingestion.html).
-
-Realtime processes provide a realtime index. Data indexed via these processes 
is immediately available for querying. Realtime processes will periodically 
build segments representing the data they’ve collected over some span of time 
and transfer these segments off to [Historical](../design/historical.html) 
processes. They use ZooKeeper to monitor the transfer and the metadata storage 
to store metadata about the transferred segment. Once transfered, segments are 
forgotten by the Realtime p [...]
-
-### Running
-
-```
-org.apache.druid.cli.Main server realtime
-```
-Segment Propagation
--------------------
-
-The segment propagation diagram for real-time data ingestion can be seen below:
-
-![Segment Propagation](../../img/segmentPropagation.png "Segment Propagation")
-
-You can read about the various components shown in this diagram under the 
Architecture section (see the menu on the right). Note that some of the names 
are now outdated.
-
-### Firehose
-
-See [Firehose](../ingestion/firehose.html).
-
-### Plumber
-
-See [Plumber](../design/plumber.html)
-
-Extending the code
-------------------
-
-Realtime integration is intended to be extended in two ways:
-
-1.  Connect to data streams from varied systems 
([Firehose](https://github.com/apache/incubator-druid/blob/master/core/src/main/org/apache/druid/data/input/FirehoseFactory.java))
-2.  Adjust the publishing strategy to match your needs 
([Plumber](https://github.com/apache/incubator-druid/blob/master/server/src/main/java/org/apache/druid/segment/realtime/plumber/PlumberSchool.java))
-
-The expectations are that the former will be very common and something that 
users of Druid will do on a fairly regular basis. Most users will probably 
never have to deal with the latter form of customization. Indeed, we hope that 
all potential use cases can be packaged up as part of Druid proper without 
requiring proprietary customization.
-
-Given those expectations, adding a firehose is straightforward and completely 
encapsulated inside of the interface. Adding a plumber is more involved and 
requires understanding of how the system works to get right, it’s not 
impossible, but it’s not intended that individuals new to Druid will be able to 
do it immediately.
-
-HTTP Endpoints
---------------
-
-The real-time process exposes several HTTP endpoints for interactions.
-
-### GET
-
-* `/status`
-
-Returns the Druid version, loaded extensions, memory used, total memory and 
other useful information about the process.
diff --git a/docs/content/development/overview.md 
b/docs/content/development/overview.md
index ad360a5..db89fce 100644
--- a/docs/content/development/overview.md
+++ b/docs/content/development/overview.md
@@ -54,8 +54,7 @@ Most of the coordination logic for (real-time) ingestion is 
in the Druid indexin
 ## Real-time Ingestion
 
 Druid loads data through `FirehoseFactory.java` classes. Firehoses often wrap 
other firehoses, where, similar to the design of the  
-query runners, each firehose adds a layer of logic. Much of the core 
management logic is in `RealtimeManager.java` and the 
-persist and hand-off logic is in `RealtimePlumber.java`.
+query runners, each firehose adds a layer of logic, and the persist and 
hand-off logic is in `RealtimePlumber.java`.
 
 ## Hadoop-based Batch Ingestion
 
diff --git a/docs/content/ingestion/firehose.md 
b/docs/content/ingestion/firehose.md
index f35bcc0..274e3b6 100644
--- a/docs/content/ingestion/firehose.md
+++ b/docs/content/ingestion/firehose.md
@@ -24,7 +24,7 @@ title: "Apache Druid (incubating) Firehoses"
 
 # Apache Druid (incubating) Firehoses
 
-Firehoses are used in [native batch ingestion 
tasks](../ingestion/native_tasks.html), stream push tasks automatically created 
by [Tranquility](../ingestion/stream-push.html), and the [stream-pull 
(deprecated)](../ingestion/stream-pull.html) ingestion model.
+Firehoses are used in [native batch ingestion 
tasks](../ingestion/native_tasks.html), stream push tasks automatically created 
by [Tranquility](../ingestion/stream-push.html) ingestion model.
 
 They are pluggable and thus the configuration schema can and will vary based 
on the `type` of the firehose.
 
@@ -204,9 +204,7 @@ This can be used to merge data from more than one firehose.
 
 ### Streaming Firehoses
 
-The firehoses shown below should only be used with the [stream-pull 
(deprecated)](../ingestion/stream-pull.html) ingestion model, as they are not 
suitable for batch ingestion.
-
-The EventReceiverFirehose is also used in tasks automatically generated by 
[Tranquility stream push](../ingestion/stream-push.html).
+The EventReceiverFirehose is used in tasks automatically generated by 
[Tranquility stream push](../ingestion/stream-push.html). These firehoses are 
not suitable for batch ingestion.
 
 #### EventReceiverFirehose
 
diff --git a/docs/content/ingestion/ingestion-spec.md 
b/docs/content/ingestion/ingestion-spec.md
index 3b03c5f..df37f5b 100644
--- a/docs/content/ingestion/ingestion-spec.md
+++ b/docs/content/ingestion/ingestion-spec.md
@@ -310,7 +310,6 @@ The IOConfig spec differs based on the ingestion task type.
 * Hadoop Batch ingestion: See [Hadoop Batch 
IOConfig](../ingestion/hadoop.html#ioconfig)
 * Kafka Indexing Service: See [Kafka Supervisor 
IOConfig](../development/extensions-core/kafka-ingestion.html#KafkaSupervisorIOConfig)
 * Stream Push Ingestion: Stream push ingestion with Tranquility does not 
require an IO Config.
-* Stream Pull Ingestion (Deprecated): See [Stream pull 
ingestion](../ingestion/stream-pull.html#ioconfig).
 
 # Tuning Config
 
@@ -320,7 +319,6 @@ The TuningConfig spec differs based on the ingestion task 
type.
 * Hadoop Batch ingestion: See [Hadoop Batch 
TuningConfig](../ingestion/hadoop.html#tuningconfig)
 * Kafka Indexing Service: See [Kafka Supervisor 
TuningConfig](../development/extensions-core/kafka-ingestion.html#KafkaSupervisorTuningConfig)
 * Stream Push Ingestion (Tranquility): See [Tranquility 
TuningConfig](http://static.druid.io/tranquility/api/latest/#com.metamx.tranquility.druid.DruidTuning).
-* Stream Pull Ingestion (Deprecated): See [Stream pull 
ingestion](../ingestion/stream-pull.html#tuningconfig).
 
 # Evaluating Timestamp, Dimensions and Metrics
 
diff --git a/docs/content/ingestion/standalone-realtime.md 
b/docs/content/ingestion/standalone-realtime.md
new file mode 100644
index 0000000..81ce89d
--- /dev/null
+++ b/docs/content/ingestion/standalone-realtime.md
@@ -0,0 +1,43 @@
+---
+layout: doc_page
+title: "Realtime Process"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Realtime Process
+
+Older versions of Apache Druid (incubating) supported a standalone 'Realtime' 
process to query and index 'stream pull'
+modes of real-time ingestion. These processes would periodically build 
segments for the data they had collected over
+some span of time and then set up hand-off to 
[Historical](../design/historical.html) servers.
+
+This processes could be invoked by
+
+```
+org.apache.druid.cli.Main server realtime
+```
+
+This model of stream pull ingestion was deprecated for a number of both 
operational and architectural reasons, and
+removed completely in Druid 0.16.0. Operationally, realtime nodes were 
difficult to configure, deploy, and scale because
+each node required an unique configuration. The design of the stream pull 
ingestion system for realtime nodes also
+suffered from limitations which made it not possible to achieve exactly once 
ingestion.
+
+Please consider using the [Kafka Indexing 
Service](../development/extensions-core/kafka-ingestion.html) or
+[Kinesis Indexing 
Service](../development/extensions-core/kinesis-ingestion.md) for stream pull 
ingestion instead.
diff --git a/docs/content/ingestion/stream-pull.md 
b/docs/content/ingestion/stream-pull.md
deleted file mode 100644
index 38f6a80..0000000
--- a/docs/content/ingestion/stream-pull.md
+++ /dev/null
@@ -1,376 +0,0 @@
----
-layout: doc_page
-title: "Stream Pull Ingestion"
----
-
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
-  -->
-
-<div class="note info">
-NOTE: Realtime processes are deprecated. Please use the <a 
href="../development/extensions-core/kafka-ingestion.html">Kafka Indexing 
Service</a> for stream pull use cases instead.
-</div>
-
-# Stream Pull Ingestion
-
-If you have an external service that you want to pull data from, you have two 
options. The simplest
-option is to set up a "copying" service that reads from the data source and 
writes to Apache Druid (incubating) using
-the [stream push method](stream-push.html).
-
-Another option is *stream pull*. With this approach, a Druid Realtime Process 
ingests data from a
-[Firehose](../ingestion/firehose.html) connected to the data you want to
-read. The Druid quickstart and tutorials do not include information about how 
to set up standalone realtime processes, but
-they can be used in place for Tranquility server and the indexing service. 
Please note that Realtime processes have different properties and roles than 
the indexing service.
-
-## Realtime Process Ingestion
-
-Much of the configuration governing Realtime processes and the ingestion of 
data is set in the Realtime spec file, discussed on this page.
-
-For general Real-time Process information, see [here](../design/realtime.html).
-
-For Real-time Process Configuration, see [Realtime 
Configuration](../configuration/realtime.html).
-
-For writing your own plugins to the real-time process, see 
[Firehose](../ingestion/firehose.html).
-
-## Realtime "specFile"
-
-The property `druid.realtime.specFile` has the path of a file (absolute or 
relative path and file name) with realtime specifications in it. This 
"specFile" should be a JSON Array of JSON objects like the following:
-
-```json
-[
-  {
-    "dataSchema" : {
-      "dataSource" : "wikipedia",
-      "parser" : {
-        "type" : "string",
-        "parseSpec" : {
-          "format" : "json",
-          "timestampSpec" : {
-            "column" : "timestamp",
-            "format" : "auto"
-          },
-          "dimensionsSpec" : {
-            "dimensions": 
["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
-            "dimensionExclusions" : [],
-            "spatialDimensions" : []
-          }
-        }
-      },
-      "metricsSpec" : [{
-        "type" : "count",
-        "name" : "count"
-      }, {
-        "type" : "doubleSum",
-        "name" : "added",
-        "fieldName" : "added"
-      }, {
-        "type" : "doubleSum",
-        "name" : "deleted",
-        "fieldName" : "deleted"
-      }, {
-        "type" : "doubleSum",
-        "name" : "delta",
-        "fieldName" : "delta"
-      }],
-      "granularitySpec" : {
-        "type" : "uniform",
-        "segmentGranularity" : "DAY",
-        "queryGranularity" : "NONE"
-      }
-    },
-    "ioConfig" : {
-      "type" : "realtime",
-      "firehose": {
-        "type": "kafka-0.8",
-        "consumerProps": {
-          "zookeeper.connect": "localhost:2181",
-          "zookeeper.connection.timeout.ms" : "15000",
-          "zookeeper.session.timeout.ms" : "15000",
-          "zookeeper.sync.time.ms" : "5000",
-          "group.id": "druid-example",
-          "fetch.message.max.bytes" : "1048586",
-          "auto.offset.reset": "largest",
-          "auto.commit.enable": "false"
-        },
-        "feed": "wikipedia"
-      },
-      "plumber": {
-        "type": "realtime"
-      }
-    },
-    "tuningConfig": {
-      "type" : "realtime",
-      "maxRowsInMemory": 1000000,
-      "intermediatePersistPeriod": "PT10M",
-      "windowPeriod": "PT10M",
-      "basePersistDirectory": "\/tmp\/realtime\/basePersist",
-      "rejectionPolicy": {
-        "type": "serverTime"
-      }
-    }
-  }
-]
-```
-
-This is a JSON Array so you can give more than one realtime stream to a given 
process. The number you can put in the same process depends on the exact 
configuration. In general, it is best to think of each realtime stream handler 
as requiring 2-threads: 1 thread for data consumption and aggregation, 1 thread 
for incremental persists and other background tasks.
-
-There are three parts to a realtime stream specification, `dataSchema`, 
`IOConfig`, and `tuningConfig` which we will go into here.
-
-### DataSchema
-
-This field is required.
-
-See [Ingestion](../ingestion/index.html)
-
-### IOConfig
-
-This field is required.
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|type|String|This should always be 'realtime'.|yes|
-|firehose|JSON Object|Where the data is coming from. Described in detail 
below.|yes|
-|plumber|JSON Object|Where the data is going. Described in detail below.|yes|
-
-#### Firehose
-
-See [Firehose](../ingestion/firehose.html) for more information on various 
firehoses.
-
-#### Plumber
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|type|String|This should always be 'realtime'.|no|
-
-### TuningConfig
-
-The tuningConfig is optional and default parameters will be used if no 
tuningConfig is specified.
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|type|String|This should always be 'realtime'.|no|
-|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. 
This number is the post-aggregation rows, so it is not equivalent to the number 
of input events, but the number of aggregated rows that those events result in. 
This is used to manage the required JVM heap size. Maximum heap memory usage 
for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no 
(default == 1000000)|
-|maxBytesInMemory|Long|The maximum number of bytes to keep in memory to 
aggregate before persisting. This is used to manage the required JVM heap 
size.|no (default == One-sixth of max JVM memory)|
-|windowPeriod|ISO 8601 Period String|The amount of lag time to allow events. 
This is configured with a 10 minute window, meaning that any event more than 10 
minutes ago will be thrown away and not included in the segment generated by 
the realtime server.|no (default == PT10M)|
-|intermediatePersistPeriod|ISO8601 Period String|The period that determines 
the rate at which intermediate persists occur. These persists determine how 
often commits happen against the incoming realtime stream. If the realtime data 
loading process is interrupted at time T, it should be restarted to re-read 
data that arrived at T minus this period.|no (default == PT10M)|
-|basePersistDirectory|String|The directory to put things that need 
persistence. The plumber is responsible for the actual intermediate persists 
and this tells it where to store those persists.|no (default == java tmp dir)|
-|versioningPolicy|Object|How to version segments.|no (default == based on 
segment start time)|
-|rejectionPolicy|Object|Controls how data sets the data acceptance policy for 
creating and handing off segments. More on this below.|no (default == 
'serverTime')|
-|maxPendingPersists|Integer|Maximum number of persists that can be pending, 
but not started. If this limit would be exceeded by a new intermediate persist, 
ingestion will block until the currently-running persist finishes. Maximum heap 
memory usage for indexing scales with maxRowsInMemory * (2 + 
maxPendingPersists).|no (default == 0; meaning one persist can be running 
concurrently with ingestion, and none can be queued up)|
-|shardSpec|Object|This describes the shard that is represented by this server. 
This must be specified properly in order to have multiple realtime processes 
indexing the same data stream in a [sharded fashion](#sharding).|no (default == 
'NoneShardSpec')|
-|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, 
this will set the thread priority of the persisting thread to 
`Thread.NORM_PRIORITY` plus this value within the bounds of 
`Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not 
change the thread priority.|no (default == 0; inherit and do not override)|
-|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, 
this will set the thread priority of the merging thread to 
`Thread.NORM_PRIORITY` plus this value within the bounds of 
`Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not 
change the thread priority.|no (default == 0; inherit and do not override)|
-|reportParseExceptions|Boolean|If true, exceptions encountered during parsing 
will be thrown and will halt ingestion. If false, unparseable rows and fields 
will be skipped. If an entire row is skipped, the "unparseable" counter will be 
incremented. If some fields in a row were parseable and some were not, the 
parseable fields will be indexed and the "unparseable" counter will not be 
incremented.|no (default == false)|
-|handoffConditionTimeout|long|Milliseconds to wait for segment handoff. It 
must be >= 0, where 0 means to wait forever.|no (default == 0)|
-|alertTimeout|long|Milliseconds timeout after which an alert is created if the 
task isn't finished by then. This allows users to monitor tasks that are 
failing to finish and give up the worker slot for any unexpected errors.|no 
(default == 0)|
-|segmentWriteOutMediumFactory|Object|Segment write-out medium to use when 
creating segments. See below for more information.|no (not specified by 
default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` 
is used)|
-|dedupColumn|String|the column to judge whether this row is already in this 
segment, if so, throw away this row. If it is String type column, to reduce 
heap cost, use long type hashcode of this column's value to judge whether this 
row is already ingested, so there maybe very small chance to throw away a row 
that is not ingested before.|no (default == null)|
-|indexSpec|Object|Tune how data is indexed. See below for more information.|no|
-
-Before enabling thread priority settings, users are highly encouraged to read 
the [original pull request](https://github.com/apache/incubator-druid/pull/984) 
and other documentation about proper use of `-XX:+UseThreadPriorities`.
-
-#### Rejection Policy
-
-The following policies are available:
-
-* `serverTime` &ndash; The recommended policy for "current time" data, it is 
optimal for current data that is generated and ingested in real time. Uses 
`windowPeriod` to accept only those events that are inside the window looking 
forward and back.
-* `messageTime` &ndash; Can be used for non-"current time" as long as that 
data is relatively in sequence. Events are rejected if they are less than 
`windowPeriod` from the event with the latest timestamp. Hand off only occurs 
if an event is seen after the segmentGranularity and `windowPeriod` (hand off 
will not periodically occur unless you have a constant stream of data).
-* `none` &ndash; All events are accepted. Never hands off data unless 
shutdown() is called on the configured firehose.
-
-#### SegmentWriteOutMediumFactory
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|type|String|See [Additional Peon Configuration: 
SegmentWriteOutMediumFactory](../configuration/index.html#segmentwriteoutmediumfactory)
 for explanation and available options.|yes|
-
-#### IndexSpec
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|bitmap|Object|Compression format for bitmap indexes. Should be a JSON object; 
see below for options.|no (defaults to Concise)|
-|dimensionCompression|String|Compression format for dimension columns. Choose 
from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)|
-|metricCompression|String|Compression format for metric columns. Choose from 
`LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)|
-
-##### Bitmap types
-
-For Concise bitmaps:
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|type|String|Must be `concise`.|yes|
-
-For Roaring bitmaps:
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|type|String|Must be `roaring`.|yes|
-|compressRunOnSerialization|Boolean|Use a run-length encoding where it is 
estimated as more space efficient.|no (default == `true`)|
-
-#### Sharding
-
-Druid uses shards, or segments with partition numbers, to more efficiently 
handle large amounts of incoming data. In Druid, shards represent the segments 
that together cover a time interval based on the value of `segmentGranularity`. 
If, for example, `segmentGranularity` is set to "hour", then a number of shards 
may be used to store the data for that hour. Sharding along dimensions may also 
occur to optimize efficiency.
-
-Segments are identified by datasource, time interval, and version. With 
sharding, a segment is also identified by a partition number. Typically, each 
shard will have the same version but a different partition number to uniquely 
identify it.
-
-In small-data scenarios, sharding is unnecessary and can be set to none (the 
default):
-
-```json
-    "shardSpec": {"type": "none"}
-```
-
-However, in scenarios with multiple realtime processes, `none` is less useful 
as it cannot help with scaling data volume (see below). Note that for the batch 
indexing service, no explicit configuration is required; sharding is provided 
automatically.
-
-Druid uses sharding based on the `shardSpec` setting you configure. The 
recommended choices, `linear` and `numbered`, are discussed below; other types 
have been useful for internal Druid development but are not appropriate for 
production setups.
-
-Keep in mind, that sharding configuration has nothing to do with configured 
firehose. For example, if you set partition number to 0, it doesn't mean that 
Kafka firehose will consume only from 0 topic partition.
-
-##### Linear
-
-This strategy provides following advantages:
-
-* There is no need to update the fileSpec configurations of existing processes 
when adding new processes.
-* All unique shards are queried, regardless of whether the partition numbering 
is sequential or not (it allows querying of partitions 0 and 2, even if 
partition 1 is missing).
-
-Configure `linear` under `schema`:
-
-```json
-    "shardSpec": {
-        "type": "linear",
-        "partitionNum": 0
-    }
-```
-
-
-##### Numbered
-
-This strategy is similar to `linear` except that it does not tolerate 
non-sequential partition numbering (it will *not* allow querying of partitions 
0 and 2 if partition 1 is missing). It also requires explicitly setting the 
total number of partitions.
-
-Configure `numbered` under `schema`:
-
-```json
-    "shardSpec": {
-        "type": "numbered",
-        "partitionNum": 0,
-        "partitions": 2
-    }
-```
-
-
-##### Scale and Redundancy
-
-The `shardSpec` configuration can be used to create redundancy by having the 
same `partitionNum` values on different processes.
-
-For example, if RealTimeProcess1 has:
-
-```json
-    "shardSpec": {
-        "type": "linear",
-        "partitionNum": 0
-    }
-```
-
-and RealTimeProcess2 has:
-
-```json
-    "shardSpec": {
-        "type": "linear",
-        "partitionNum": 0
-    }
-```
-
-then two realtime processes can store segments with the same datasource, 
version, time interval, and partition number. Brokers that query for data in 
such segments will assume that they hold the same data, and the query will 
target only one of the segments.
-
-`shardSpec` can also help achieve scale. For this, add processes with a 
different `partionNum`. Continuing with the example, if RealTimeProcess3 has:
-
-```json
-    "shardSpec": {
-        "type": "linear",
-        "partitionNum": 1
-    }
-```
-
-then it can store segments with the same datasource, time interval, and 
version as in the first two processes, but with a different partition number. 
Brokers that query for data in such segments will assume that a segment from 
RealTimeProcess3 holds *different* data, and the query will target it along 
with a segment from the first two processes.
-
-You can use type `numbered` similarly. Note that type `none` is essentially 
type `linear` with all shards having a fixed `partitionNum` of 0.
-
-## Constraints
-
-The following table summarizes constraints between settings in the spec file 
for the Realtime subsystem.
-
-|Name|Effect|Minimum|Recommended|
-|----|------|-------|-----------|
-|windowPeriod| When reading a row, events with timestamp older than now minus 
this window are discarded | time jitter tolerance | use this to reject outliers 
|
-|segmentGranularity| Time granularity (minute, hour, day, week, month) for 
loading data at query time | equal to indexGranularity| more than 
queryGranularity|
-|queryGranularity| Time granularity (minute, hour, day, week, month) for 
rollup | less than segmentGranularity| minute, hour, day, week, month |
-|intermediatePersistPeriod| The max time (ISO8601 Period) between flushes of 
ingested rows from memory to disk | avoid excessive flushing | number of 
un-persisted rows in memory also constrained by maxRowsInMemory |
-|maxRowsInMemory| The max number of ingested rows to hold in memory before a 
flush to disk. Normally user does not need to set this, but depending on the 
nature of data, if rows are short in terms of bytes, user may not want to store 
a million rows in memory and this value should be set| number of un-persisted 
post-aggregation rows in memory is also constrained by 
intermediatePersistPeriod | use this to avoid running out of heap if too many 
rows in an intermediatePersistPeriod |
-|maxBytesInMemory| The number of bytes to keep in memory before a flush to 
disk. Normally this is computed internally and user does not need to set it. 
This is based on a rough estimate of memory usage and not actual usage. The 
maximum heap memory usage for indexing is maxBytesInMemory * (2 + 
maxPendingPersists)| number of un-persisted post-aggregation bytes in memory is 
also constrained by intermediatePersistPeriod | use this to avoid running out 
of heap if too many rows in an intermedi [...]
-
-The normal, expected use cases have the following overall constraints: 
`intermediatePersistPeriod ≤ windowPeriod < segmentGranularity` and 
`queryGranularity ≤ segmentGranularity`
-
-## Limitations
-
-### Kafka
-
-Standalone realtime processes use the Kafka high level consumer, which imposes 
a few restrictions.
-
-Druid replicates segment such that logically equivalent data segments are 
concurrently hosted on N processes. If N–1 processes go down,
-the data will still be available for querying. On real-time processes, this 
process depends on maintaining logically equivalent
-data segments on each of the N processes, which is not possible with standard 
Kafka consumer groups if your Kafka topic requires more than one consumer
-(because consumers in different consumer groups will split up the data 
differently).
-
-For example, let's say your topic is split across Kafka partitions 1, 2, & 3 
and you have 2 real-time processes with linear shard specs 1 & 2.
-Both of the real-time processes are in the same consumer group. Real-time 
process 1 may consume data from partitions 1 & 3, and real-time process 2 may 
consume data from partition 2.
-Querying for your data through the Broker will yield correct results.
-
-The problem arises if you want to replicate your data by creating real-time 
processes 3 & 4. These new real-time processes also
-have linear shard specs 1 & 2, and they will consume data from Kafka using a 
different consumer group. In this case,
-real-time process 3 may consume data from partitions 1 & 2, and real-time 
process 4 may consume data from partition 2.
-From Druid's perspective, the segments hosted by real-time processes 1 and 3 
are the same, and the data hosted by real-time processes
-2 and 4 are the same, although they are reading from different Kafka 
partitions. Querying for the data will yield inconsistent
-results.
-
-Is this always a problem? No. If your data is small enough to fit on a single 
Kafka partition, you can replicate without issues.
-Otherwise, you can run real-time processes without replication.
-
-Please note that druid will skip over event that failed its checksum and it is 
corrupt.
-
-### Locking
-
-Using stream pull ingestion with Realtime processes together batch ingestion 
may introduce data override issues. For example, if you
-are generating hourly segments for the current day, and run a daily batch job 
for the current day's data, the segments created by
-the batch job will have a more recent version than most of the segments 
generated by realtime ingestion. If your batch job is indexing
-data that isn't yet complete for the day, the daily segment created by the 
batch job can override recent segments created by
-realtime processes. A portion of data will appear to be lost in this case.
-
-### Schema changes
-
-Standalone realtime processes require stopping a process to update a schema, 
and starting it up again for the schema to take effect.
-This can be difficult to manage at scale, especially with multiple partitions.
-
-### Log management
-
-Each standalone realtime process has its own set of logs. Diagnosing errors 
across many partitions across many servers may be
-difficult to manage and track at scale.
-
-## Deployment Notes
-
-Stream ingestion may generate a large number of small segments because it's 
difficult to optimize the segment size at
-ingestion time. The number of segments will increase over time, and this might 
cause the query performance issue.
-
-Details on how to optimize the segment size can be found on [Segment size 
optimization](../operations/segment-optimization.html).
diff --git 
a/server/src/main/java/org/apache/druid/guice/FireDepartmentsProvider.java 
b/server/src/main/java/org/apache/druid/guice/FireDepartmentsProvider.java
deleted file mode 100644
index feab7e0..0000000
--- a/server/src/main/java/org/apache/druid/guice/FireDepartmentsProvider.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.guice;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import org.apache.druid.segment.realtime.FireDepartment;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- */
-public class FireDepartmentsProvider implements Provider<List<FireDepartment>>
-{
-  private final List<FireDepartment> fireDepartments = new ArrayList<>();
-
-  @Inject
-  public FireDepartmentsProvider(
-      ObjectMapper jsonMapper,
-      RealtimeManagerConfig config
-  )
-  {
-    try {
-      this.fireDepartments.addAll(
-          jsonMapper.readValue(config.getSpecFile(), new 
TypeReference<List<FireDepartment>>() {})
-      );
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-
-  @Override
-  public List<FireDepartment> get()
-  {
-    return fireDepartments;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/druid/guice/RealtimeManagerConfig.java 
b/server/src/main/java/org/apache/druid/guice/RealtimeManagerConfig.java
deleted file mode 100644
index 97da66d..0000000
--- a/server/src/main/java/org/apache/druid/guice/RealtimeManagerConfig.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.guice;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.File;
-
-/**
- */
-public class RealtimeManagerConfig
-{
-  @JsonProperty
-  private File specFile;
-
-  public File getSpecFile()
-  {
-    return specFile;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/RealtimeManager.java 
b/server/src/main/java/org/apache/druid/segment/realtime/RealtimeManager.java
deleted file mode 100644
index feaaeb0..0000000
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/RealtimeManager.java
+++ /dev/null
@@ -1,393 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.realtime;
-
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Iterables;
-import com.google.inject.Inject;
-import org.apache.druid.data.input.Committer;
-import org.apache.druid.data.input.Firehose;
-import org.apache.druid.data.input.FirehoseV2;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.query.FinalizeResultsQueryRunner;
-import org.apache.druid.query.NoopQueryRunner;
-import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryRunner;
-import org.apache.druid.query.QueryRunnerFactory;
-import org.apache.druid.query.QueryRunnerFactoryConglomerate;
-import org.apache.druid.query.QuerySegmentWalker;
-import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.spec.SpecificSegmentSpec;
-import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.realtime.plumber.Committers;
-import org.apache.druid.segment.realtime.plumber.Plumber;
-import org.apache.druid.segment.realtime.plumber.Plumbers;
-import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
-import org.joda.time.Interval;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class RealtimeManager implements QuerySegmentWalker
-{
-  private static final EmittingLogger log = new 
EmittingLogger(RealtimeManager.class);
-
-  private final List<FireDepartment> fireDepartments;
-  private final QueryRunnerFactoryConglomerate conglomerate;
-  private final DataSegmentServerAnnouncer serverAnnouncer;
-
-  /**
-   * key=data source name,value=mappings of partition number to FireChief
-   */
-  private final Map<String, Map<Integer, FireChief>> chiefs;
-
-  private ExecutorService fireChiefExecutor;
-  private boolean stopping;
-
-  @Inject
-  public RealtimeManager(
-      List<FireDepartment> fireDepartments,
-      QueryRunnerFactoryConglomerate conglomerate,
-      DataSegmentServerAnnouncer serverAnnouncer
-  )
-  {
-    this(fireDepartments, conglomerate, serverAnnouncer, new HashMap<>());
-  }
-
-  @VisibleForTesting
-  RealtimeManager(
-      List<FireDepartment> fireDepartments,
-      QueryRunnerFactoryConglomerate conglomerate,
-      DataSegmentServerAnnouncer serverAnnouncer,
-      Map<String, Map<Integer, FireChief>> chiefs
-  )
-  {
-    this.fireDepartments = fireDepartments;
-    this.conglomerate = conglomerate;
-    this.serverAnnouncer = serverAnnouncer;
-    this.chiefs = chiefs == null ? new HashMap<>() : new HashMap<>(chiefs);
-  }
-
-  @VisibleForTesting
-  Map<Integer, FireChief> getFireChiefs(String dataSource)
-  {
-    return chiefs.get(dataSource);
-  }
-
-  @LifecycleStart
-  public void start()
-  {
-    serverAnnouncer.announce();
-
-    fireChiefExecutor = Execs.multiThreaded(fireDepartments.size(), 
"chief-%d");
-
-    for (final FireDepartment fireDepartment : fireDepartments) {
-      final DataSchema schema = fireDepartment.getDataSchema();
-
-      final FireChief chief = new FireChief(fireDepartment, conglomerate);
-      chiefs.computeIfAbsent(schema.getDataSource(), k -> new HashMap<>())
-            
.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief);
-
-      fireChiefExecutor.submit(chief);
-    }
-  }
-
-  @LifecycleStop
-  public void stop()
-  {
-    stopping = true;
-    try {
-      if (fireChiefExecutor != null) {
-        fireChiefExecutor.shutdownNow();
-        Preconditions.checkState(
-            fireChiefExecutor.awaitTermination(10, TimeUnit.SECONDS),
-            "persistExecutor not terminated"
-        );
-      }
-    }
-    catch (InterruptedException e) {
-      throw new ISE(e, "Failed to shutdown fireChiefExecutor during stop()");
-    }
-    serverAnnouncer.unannounce();
-  }
-
-  public FireDepartmentMetrics getMetrics(String datasource)
-  {
-    Map<Integer, FireChief> chiefs = this.chiefs.get(datasource);
-    if (chiefs == null) {
-      return null;
-    }
-    FireDepartmentMetrics snapshot = null;
-    for (FireChief chief : chiefs.values()) {
-      if (snapshot == null) {
-        snapshot = chief.getMetrics().snapshot();
-      } else {
-        snapshot.merge(chief.getMetrics());
-      }
-    }
-    return snapshot;
-  }
-
-  @Override
-  public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, 
Iterable<Interval> intervals)
-  {
-    final QueryRunnerFactory<T, Query<T>> factory = 
conglomerate.findFactory(query);
-    final Map<Integer, FireChief> partitionChiefs = 
chiefs.get(Iterables.getOnlyElement(query.getDataSource()
-                                                                               
              .getNames()));
-
-    return partitionChiefs == null ? new NoopQueryRunner<T>() : 
factory.getToolchest().mergeResults(
-        factory.mergeRunners(
-            Execs.directExecutor(),
-            // Chaining query runners which wait on submitted chain query 
runners can make executor pools deadlock
-            Iterables.transform(
-                partitionChiefs.values(), new Function<FireChief, 
QueryRunner<T>>()
-                {
-                  @Override
-                  public QueryRunner<T> apply(FireChief fireChief)
-                  {
-                    return fireChief.getQueryRunner(query);
-                  }
-                }
-            )
-        )
-    );
-  }
-
-  @Override
-  public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, 
final Iterable<SegmentDescriptor> specs)
-  {
-    final QueryRunnerFactory<T, Query<T>> factory = 
conglomerate.findFactory(query);
-    final Map<Integer, FireChief> partitionChiefs = 
chiefs.get(Iterables.getOnlyElement(query.getDataSource()
-                                                                               
              .getNames()));
-
-    return partitionChiefs == null
-           ? new NoopQueryRunner<T>()
-           : factory.getToolchest().mergeResults(
-               factory.mergeRunners(
-                   Execs.directExecutor(),
-                   Iterables.transform(
-                       specs,
-                       new Function<SegmentDescriptor, QueryRunner<T>>()
-                       {
-                         @Override
-                         public QueryRunner<T> apply(SegmentDescriptor spec)
-                         {
-                           final FireChief retVal = 
partitionChiefs.get(spec.getPartitionNumber());
-                           return retVal == null
-                                  ? new NoopQueryRunner<T>()
-                                  : 
retVal.getQueryRunner(query.withQuerySegmentSpec(new 
SpecificSegmentSpec(spec)));
-                         }
-                       }
-                   )
-               )
-           );
-  }
-
-  class FireChief implements Runnable
-  {
-    private final FireDepartment fireDepartment;
-    private final FireDepartmentMetrics metrics;
-    private final RealtimeTuningConfig config;
-    private final QueryRunnerFactoryConglomerate conglomerate;
-
-    private Plumber plumber;
-
-    FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate 
conglomerate)
-    {
-      this.fireDepartment = fireDepartment;
-      this.conglomerate = conglomerate;
-      this.config = fireDepartment.getTuningConfig();
-      this.metrics = fireDepartment.getMetrics();
-    }
-
-    private Firehose initFirehose()
-    {
-      try {
-        log.info("Calling the FireDepartment and getting a Firehose.");
-        return fireDepartment.connect();
-      }
-      catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    private FirehoseV2 initFirehoseV2(Object metaData) throws IOException
-    {
-      log.info("Calling the FireDepartment and getting a FirehoseV2.");
-      return fireDepartment.connect(metaData);
-    }
-
-    private void initPlumber()
-    {
-      log.info("Someone get us a plumber!");
-      plumber = fireDepartment.findPlumber();
-    }
-
-    @VisibleForTesting
-    Plumber getPlumber()
-    {
-      return plumber;
-    }
-
-    public FireDepartmentMetrics getMetrics()
-    {
-      return metrics;
-    }
-
-    @Override
-    public void run()
-    {
-      initPlumber();
-
-      try {
-        final Closer closer = Closer.create();
-
-        try {
-          Object metadata = plumber.startJob();
-
-          Firehose firehose;
-          FirehoseV2 firehoseV2;
-          final boolean success;
-          if (fireDepartment.checkFirehoseV2()) {
-            firehoseV2 = initFirehoseV2(metadata);
-            closer.register(firehoseV2);
-            success = runFirehoseV2(firehoseV2);
-          } else {
-            firehose = initFirehose();
-            closer.register(firehose);
-            success = runFirehose(firehose);
-          }
-          if (success) {
-            // pluber.finishJob() is called only when every processing is 
successfully finished.
-            closer.register(() -> plumber.finishJob());
-          }
-        }
-        catch (Exception e) {
-          log.makeAlert(
-              e,
-              "[%s] aborted realtime processing[%s]",
-              e.getClass().getSimpleName(),
-              fireDepartment.getDataSchema().getDataSource()
-          ).emit();
-          throw closer.rethrow(e);
-        }
-        catch (Error e) {
-          log.makeAlert(e, "Error aborted realtime processing[%s]", 
fireDepartment.getDataSchema().getDataSource())
-             .emit();
-          throw closer.rethrow(e);
-        }
-        finally {
-          closer.close();
-        }
-      }
-      catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    private boolean runFirehoseV2(FirehoseV2 firehose)
-    {
-      firehose.start();
-
-      log.info("FirehoseV2 started");
-      final Supplier<Committer> committerSupplier = 
Committers.supplierFromFirehoseV2(firehose);
-      boolean haveRow = true;
-      while (haveRow) {
-        if (Thread.interrupted() || stopping) {
-          return false;
-        }
-        InputRow inputRow = null;
-        try {
-          inputRow = firehose.currRow();
-          if (inputRow != null) {
-            IncrementalIndexAddResult addResult = plumber.add(inputRow, 
committerSupplier);
-            int numRows = addResult.getRowCount();
-            if (numRows == -2) {
-              metrics.incrementDedup();
-              log.debug("Throwing away duplicate event[%s]", inputRow);
-            } else if (numRows < 0) {
-              metrics.incrementThrownAway();
-              log.debug("Throwing away event[%s] due to %s", inputRow, 
addResult.getReasonOfNotAdded());
-            } else {
-              metrics.incrementProcessed();
-            }
-          } else {
-            log.debug("thrown away null input row, considering unparseable");
-            metrics.incrementUnparseable();
-          }
-        }
-        catch (Exception e) {
-          log.makeAlert(e, "Unknown exception, Ignoring and continuing.")
-             .addData("inputRow", inputRow)
-             .emit();
-        }
-
-        try {
-          haveRow = firehose.advance();
-        }
-        catch (Exception e) {
-          log.debug(e, "exception in firehose.advance(), considering 
unparseable row");
-          metrics.incrementUnparseable();
-        }
-      }
-      return true;
-    }
-
-    private boolean runFirehose(Firehose firehose)
-    {
-      final Supplier<Committer> committerSupplier = 
Committers.supplierFromFirehose(firehose);
-      while (firehose.hasMore()) {
-        if (Thread.interrupted() || stopping) {
-          return false;
-        }
-        Plumbers.addNextRow(committerSupplier, firehose, plumber, 
config.isReportParseExceptions(), metrics);
-      }
-      return true;
-    }
-
-    public <T> QueryRunner<T> getQueryRunner(Query<T> query)
-    {
-      QueryRunnerFactory<T, Query<T>> factory = 
conglomerate.findFactory(query);
-      QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
-
-      return new FinalizeResultsQueryRunner<T>(plumber.getQueryRunner(query), 
toolChest);
-    }
-  }
-}
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java
deleted file mode 100644
index 7df4c14..0000000
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java
+++ /dev/null
@@ -1,1104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.realtime;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Function;
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import org.apache.druid.data.input.Committer;
-import org.apache.druid.data.input.Firehose;
-import org.apache.druid.data.input.FirehoseFactory;
-import org.apache.druid.data.input.FirehoseFactoryV2;
-import org.apache.druid.data.input.FirehoseV2;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.Row;
-import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.query.BaseQuery;
-import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryRunner;
-import org.apache.druid.query.QueryRunnerFactory;
-import org.apache.druid.query.QueryRunnerFactoryConglomerate;
-import org.apache.druid.query.QueryRunnerTestHelper;
-import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.query.dimension.DefaultDimensionSpec;
-import org.apache.druid.query.groupby.GroupByQuery;
-import org.apache.druid.query.groupby.GroupByQueryConfig;
-import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
-import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
-import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper;
-import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
-import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
-import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
-import org.apache.druid.query.spec.SpecificSegmentSpec;
-import org.apache.druid.segment.TestHelper;
-import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
-import org.apache.druid.segment.incremental.IndexSizeExceededException;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.RealtimeIOConfig;
-import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.indexing.TuningConfigs;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.segment.realtime.plumber.Plumber;
-import org.apache.druid.segment.realtime.plumber.Sink;
-import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
-import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.apache.druid.utils.Runnables;
-import org.easymock.EasyMock;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.joda.time.Period;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- *
- */
-public class RealtimeManagerTest
-{
-  private static QueryRunnerFactory factory;
-  private static Closer resourceCloser;
-  private static QueryRunnerFactoryConglomerate conglomerate;
-
-  private static final List<TestInputRowHolder> rows = Arrays.asList(
-      makeRow(DateTimes.of("9000-01-01").getMillis()),
-      makeRow(new ParseException("parse error")),
-      null,
-      makeRow(System.currentTimeMillis())
-  );
-
-  private RealtimeManager realtimeManager;
-  private RealtimeManager realtimeManager2;
-  private RealtimeManager realtimeManager3;
-  private DataSchema schema;
-  private DataSchema schema2;
-  private TestPlumber plumber;
-  private TestPlumber plumber2;
-  private RealtimeTuningConfig tuningConfig_0;
-  private RealtimeTuningConfig tuningConfig_1;
-  private DataSchema schema3;
-
-  @BeforeClass
-  public static void setupStatic()
-  {
-    final Pair<GroupByQueryRunnerFactory, Closer> factoryAndCloser = 
initFactory();
-    factory = factoryAndCloser.lhs;
-    resourceCloser = factoryAndCloser.rhs;
-    conglomerate = new QueryRunnerFactoryConglomerate()
-    {
-      @Override
-      public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> 
findFactory(QueryType query)
-      {
-        return factory;
-      }
-    };
-  }
-
-  @AfterClass
-  public static void teardownStatic() throws IOException
-  {
-    resourceCloser.close();
-  }
-
-  @Before
-  public void setUp()
-  {
-    ObjectMapper jsonMapper = new DefaultObjectMapper();
-
-    schema = new DataSchema(
-        "test",
-        null,
-        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
-        new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, 
null),
-        null,
-        jsonMapper
-    );
-    schema2 = new DataSchema(
-        "testV2",
-        null,
-        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
-        new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, 
null),
-        null,
-        jsonMapper
-    );
-    RealtimeIOConfig ioConfig = new RealtimeIOConfig(
-        new FirehoseFactory()
-        {
-          @Override
-          public Firehose connect(InputRowParser parser, File 
temporaryDirectory)
-          {
-            return new TestFirehose(rows.iterator());
-          }
-        },
-        (schema, config, metrics) -> plumber,
-        null
-    );
-    RealtimeIOConfig ioConfig2 = new RealtimeIOConfig(
-        null,
-        (schema, config, metrics) -> plumber2,
-        new FirehoseFactoryV2()
-        {
-          @Override
-          public FirehoseV2 connect(InputRowParser parser, Object arg1) throws 
ParseException
-          {
-            return new TestFirehoseV2(rows.iterator());
-          }
-        }
-    );
-    RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
-        1,
-        null,
-        new Period("P1Y"),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        0,
-        0,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
-    plumber = new TestPlumber(new Sink(
-        Intervals.of("0/P5000Y"),
-        schema,
-        tuningConfig.getShardSpec(),
-        DateTimes.nowUtc().toString(),
-        tuningConfig.getMaxRowsInMemory(),
-        
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
-        tuningConfig.isReportParseExceptions(),
-        tuningConfig.getDedupColumn()
-    ));
-
-    realtimeManager = new RealtimeManager(
-        Collections.singletonList(
-            new FireDepartment(
-                schema,
-                ioConfig,
-                tuningConfig
-            )
-        ),
-        null,
-        EasyMock.createNiceMock(DataSegmentServerAnnouncer.class)
-    );
-    plumber2 = new TestPlumber(new Sink(
-        Intervals.of("0/P5000Y"),
-        schema2,
-        tuningConfig.getShardSpec(),
-        DateTimes.nowUtc().toString(),
-        tuningConfig.getMaxRowsInMemory(),
-        
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
-        tuningConfig.isReportParseExceptions(),
-        tuningConfig.getDedupColumn()
-    ));
-
-    realtimeManager2 = new RealtimeManager(
-        Collections.singletonList(
-            new FireDepartment(
-                schema2,
-                ioConfig2,
-                tuningConfig
-            )
-        ),
-        null,
-        EasyMock.createNiceMock(DataSegmentServerAnnouncer.class)
-    );
-
-    tuningConfig_0 = new RealtimeTuningConfig(
-        1,
-        null,
-        new Period("P1Y"),
-        null,
-        null,
-        null,
-        null,
-        null,
-        new LinearShardSpec(0),
-        null,
-        null,
-        0,
-        0,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
-
-    tuningConfig_1 = new RealtimeTuningConfig(
-        1,
-        null,
-        new Period("P1Y"),
-        null,
-        null,
-        null,
-        null,
-        null,
-        new LinearShardSpec(1),
-        null,
-        null,
-        0,
-        0,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
-
-    schema3 = new DataSchema(
-        "testing",
-        null,
-        new AggregatorFactory[]{new CountAggregatorFactory("ignore")},
-        new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, 
null),
-        null,
-        jsonMapper
-    );
-
-    FireDepartment department_0 = new FireDepartment(schema3, ioConfig, 
tuningConfig_0);
-    FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, 
tuningConfig_1);
-
-    realtimeManager3 = new RealtimeManager(
-        Arrays.asList(department_0, department_1),
-        conglomerate,
-        EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
-        null
-    );
-  }
-
-  @After
-  public void tearDown()
-  {
-    realtimeManager.stop();
-    realtimeManager2.stop();
-    realtimeManager3.stop();
-  }
-
-  @Test
-  public void testRun() throws Exception
-  {
-    realtimeManager.start();
-
-    Stopwatch stopwatch = Stopwatch.createStarted();
-    while (realtimeManager.getMetrics("test").processed() != 1) {
-      Thread.sleep(100);
-      if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
-        throw new ISE("Realtime manager should have completed processing 2 
events!");
-      }
-    }
-
-    Assert.assertEquals(1, realtimeManager.getMetrics("test").processed());
-    Assert.assertEquals(2, realtimeManager.getMetrics("test").thrownAway());
-    Assert.assertEquals(1, realtimeManager.getMetrics("test").unparseable());
-    Assert.assertTrue(plumber.isStartedJob());
-    Assert.assertTrue(plumber.isFinishedJob());
-    Assert.assertEquals(0, plumber.getPersistCount());
-  }
-
-  @Test
-  public void testRunV2() throws Exception
-  {
-    realtimeManager2.start();
-
-    Stopwatch stopwatch = Stopwatch.createStarted();
-    while (realtimeManager2.getMetrics("testV2").processed() != 1) {
-      Thread.sleep(100);
-      if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
-        throw new ISE("Realtime manager should have completed processing 2 
events!");
-      }
-    }
-
-    Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").processed());
-    Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").thrownAway());
-    Assert.assertEquals(2, 
realtimeManager2.getMetrics("testV2").unparseable());
-    Assert.assertTrue(plumber2.isStartedJob());
-    Assert.assertTrue(plumber2.isFinishedJob());
-    Assert.assertEquals(0, plumber2.getPersistCount());
-  }
-
-  @Test(timeout = 60_000L)
-  public void testNormalStop() throws InterruptedException
-  {
-    final TestFirehose firehose = new TestFirehose(rows.iterator());
-    final TestFirehoseV2 firehoseV2 = new TestFirehoseV2(rows.iterator());
-    final RealtimeIOConfig ioConfig = new RealtimeIOConfig(
-        new FirehoseFactory()
-        {
-          @Override
-          public Firehose connect(InputRowParser parser, File 
temporaryDirectory)
-          {
-            return firehose;
-          }
-        },
-        (schema, config, metrics) -> plumber,
-        null
-    );
-    RealtimeIOConfig ioConfig2 = new RealtimeIOConfig(
-        null,
-        (schema, config, metrics) -> plumber2,
-        (parser, arg) -> firehoseV2
-    );
-
-    final FireDepartment department_0 = new FireDepartment(schema3, ioConfig, 
tuningConfig_0);
-    final FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, 
tuningConfig_1);
-
-    final RealtimeManager realtimeManager = new RealtimeManager(
-        Arrays.asList(department_0, department_1),
-        conglomerate,
-        EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
-        null
-    );
-
-    realtimeManager.start();
-    while (realtimeManager.getMetrics("testing").processed() < 2) {
-      Thread.sleep(100);
-    }
-    realtimeManager.stop();
-
-    Assert.assertTrue(firehose.isClosed());
-    Assert.assertTrue(firehoseV2.isClosed());
-    Assert.assertTrue(plumber.isFinishedJob());
-    Assert.assertTrue(plumber2.isFinishedJob());
-  }
-
-  @Test(timeout = 60_000L)
-  public void testStopByInterruption()
-  {
-    final SleepingFirehose firehose = new SleepingFirehose();
-    final RealtimeIOConfig ioConfig = new RealtimeIOConfig(
-        new FirehoseFactory()
-        {
-          @Override
-          public Firehose connect(InputRowParser parser, File 
temporaryDirectory)
-          {
-            return firehose;
-          }
-        },
-        (schema, config, metrics) -> plumber,
-        null
-    );
-
-    final FireDepartment department_0 = new FireDepartment(schema, ioConfig, 
tuningConfig_0);
-
-    final RealtimeManager realtimeManager = new RealtimeManager(
-        Collections.singletonList(department_0),
-        conglomerate,
-        EasyMock.createNiceMock(DataSegmentServerAnnouncer.class),
-        null
-    );
-
-    realtimeManager.start();
-    realtimeManager.stop();
-
-    Assert.assertTrue(firehose.isClosed());
-    Assert.assertFalse(plumber.isFinishedJob());
-  }
-
-  @Test(timeout = 60_000L)
-  public void testQueryWithInterval() throws InterruptedException
-  {
-    List<Row> expectedResults = Arrays.asList(
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"automotive", "rows", 2L, "idx", 270L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"business", "rows", 2L, "idx", 236L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"entertainment", "rows", 2L, "idx", 316L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"health", "rows", 2L, "idx", 240L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"mezzanine", "rows", 6L, "idx", 5740L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"news", "rows", 2L, "idx", 242L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"premium", "rows", 6L, "idx", 5800L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"technology", "rows", 2L, "idx", 156L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"travel", "rows", 2L, "idx", 238L),
-
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"automotive", "rows", 2L, "idx", 294L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"business", "rows", 2L, "idx", 224L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"entertainment", "rows", 2L, "idx", 332L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"health", "rows", 2L, "idx", 226L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"mezzanine", "rows", 6L, "idx", 4894L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"news", "rows", 2L, "idx", 228L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"premium", "rows", 6L, "idx", 5010L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"technology", "rows", 2L, "idx", 194L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"travel", "rows", 2L, "idx", 252L)
-    );
-
-    realtimeManager3.start();
-
-    awaitStarted();
-
-    for (QueryRunner runner : 
QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) {
-      GroupByQuery query = GroupByQuery
-          .builder()
-          .setDataSource(QueryRunnerTestHelper.dataSource)
-          .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
-          .setDimensions(new DefaultDimensionSpec("quality", "alias"))
-          .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new 
LongSumAggregatorFactory("idx", "index"))
-          .setGranularity(QueryRunnerTestHelper.dayGran)
-          .build();
-      plumber.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner));
-      plumber2.setRunners(ImmutableMap.of(query.getIntervals().get(0), 
runner));
-
-      Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(
-          factory,
-          realtimeManager3.getQueryRunnerForIntervals(
-              query,
-              QueryRunnerTestHelper.firstToThird.getIntervals()
-          ),
-          query
-      );
-
-      TestHelper.assertExpectedObjects(expectedResults, results, "interval");
-    }
-
-  }
-
-  private void awaitStarted() throws InterruptedException
-  {
-    while (true) {
-      boolean notAllStarted = realtimeManager3
-          .getFireChiefs("testing").values().stream()
-          .anyMatch(
-              fireChief -> {
-                final Plumber plumber = fireChief.getPlumber();
-                return plumber == null || !((TestPlumber) 
plumber).isStartedJob();
-              }
-          );
-      if (!notAllStarted) {
-        break;
-      }
-      Thread.sleep(10);
-    }
-  }
-
-  @Test(timeout = 60_000L)
-  public void testQueryWithSegmentSpec() throws InterruptedException
-  {
-    List<Row> expectedResults = Arrays.asList(
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"automotive", "rows", 1L, "idx", 135L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"business", "rows", 1L, "idx", 118L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"entertainment", "rows", 1L, "idx", 158L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"health", "rows", 1L, "idx", 120L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"mezzanine", "rows", 3L, "idx", 2870L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"news", "rows", 1L, "idx", 121L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"premium", "rows", 3L, "idx", 2900L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"technology", "rows", 1L, "idx", 78L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", 
"travel", "rows", 1L, "idx", 119L),
-
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"automotive", "rows", 1L, "idx", 147L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"business", "rows", 1L, "idx", 112L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"entertainment", "rows", 1L, "idx", 166L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"health", "rows", 1L, "idx", 113L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"mezzanine", "rows", 3L, "idx", 2447L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"news", "rows", 1L, "idx", 114L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"premium", "rows", 3L, "idx", 2505L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"technology", "rows", 1L, "idx", 97L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", 
"travel", "rows", 1L, "idx", 126L)
-    );
-
-    realtimeManager3.start();
-
-    awaitStarted();
-
-    for (QueryRunner runner : 
QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) {
-      GroupByQuery query = GroupByQuery
-          .builder()
-          .setDataSource(QueryRunnerTestHelper.dataSource)
-          .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
-          .setDimensions(new DefaultDimensionSpec("quality", "alias"))
-          .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new 
LongSumAggregatorFactory("idx", "index"))
-          .setGranularity(QueryRunnerTestHelper.dayGran)
-          .build();
-      plumber.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner));
-      plumber2.setRunners(ImmutableMap.of(query.getIntervals().get(0), 
runner));
-
-      Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(
-          factory,
-          realtimeManager3.getQueryRunnerForSegments(
-              query,
-              ImmutableList.of(
-                  new SegmentDescriptor(
-                      
Intervals.of("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"),
-                      "ver",
-                      0
-                  ))
-          ),
-          query
-      );
-      TestHelper.assertExpectedObjects(expectedResults, results, 
"segmentSpec");
-
-      results = GroupByQueryRunnerTestHelper.runQuery(
-          factory,
-          realtimeManager3.getQueryRunnerForSegments(
-              query,
-              ImmutableList.of(
-                  new SegmentDescriptor(
-                      
Intervals.of("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"),
-                      "ver",
-                      1
-                  ))
-          ),
-          query
-      );
-      TestHelper.assertExpectedObjects(expectedResults, results, 
"segmentSpec");
-    }
-
-  }
-
-  @Test(timeout = 60_000L)
-  public void testQueryWithMultipleSegmentSpec() throws InterruptedException
-  {
-
-    List<Row> expectedResults_both_partitions = Arrays.asList(
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", 
"business", "rows", 2L, "idx", 260L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", 
"health", "rows", 2L, "idx", 236L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", 
"mezzanine", "rows", 4L, "idx", 4556L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", 
"news", "rows", 2L, "idx", 284L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", 
"technology", "rows", 2L, "idx", 202L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", 
"automotive", "rows", 2L, "idx", 288L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", 
"entertainment", "rows", 2L, "idx", 326L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"automotive", "rows", 2L, "idx", 312L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"business", "rows", 2L, "idx", 248L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"entertainment", "rows", 2L, "idx", 326L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"health", "rows", 2L, "idx", 262L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"mezzanine", "rows", 6L, "idx", 5126L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"news", "rows", 2L, "idx", 254L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"premium", "rows", 6L, "idx", 5276L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"technology", "rows", 2L, "idx", 206L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"travel", "rows", 2L, "idx", 260L)
-    );
-
-    List<Row> expectedResults_single_partition_26_28 = Arrays.asList(
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", 
"business", "rows", 1L, "idx", 130L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", 
"health", "rows", 1L, "idx", 118L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", 
"mezzanine", "rows", 2L, "idx", 2278L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", 
"news", "rows", 1L, "idx", 142L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", 
"technology", "rows", 1L, "idx", 101L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", 
"automotive", "rows", 1L, "idx", 144L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", 
"entertainment", "rows", 1L, "idx", 163L)
-    );
-
-    List<Row> expectedResults_single_partition_28_29 = Arrays.asList(
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"automotive", "rows", 1L, "idx", 156L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"business", "rows", 1L, "idx", 124L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"entertainment", "rows", 1L, "idx", 163L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"health", "rows", 1L, "idx", 131L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"mezzanine", "rows", 3L, "idx", 2563L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"news", "rows", 1L, "idx", 127L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"premium", "rows", 3L, "idx", 2638L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"technology", "rows", 1L, "idx", 103L),
-        GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", 
"travel", "rows", 1L, "idx", 130L)
-    );
-
-    realtimeManager3.start();
-
-    awaitStarted();
-
-    final Interval interval_26_28 = 
Intervals.of("2011-03-26T00:00:00.000Z/2011-03-28T00:00:00.000Z");
-    final Interval interval_28_29 = 
Intervals.of("2011-03-28T00:00:00.000Z/2011-03-29T00:00:00.000Z");
-    final SegmentDescriptor descriptor_26_28_0 = new 
SegmentDescriptor(interval_26_28, "ver0", 0);
-    final SegmentDescriptor descriptor_28_29_0 = new 
SegmentDescriptor(interval_28_29, "ver1", 0);
-    final SegmentDescriptor descriptor_26_28_1 = new 
SegmentDescriptor(interval_26_28, "ver0", 1);
-    final SegmentDescriptor descriptor_28_29_1 = new 
SegmentDescriptor(interval_28_29, "ver1", 1);
-
-    GroupByQuery query = GroupByQuery
-        .builder()
-        .setDataSource(QueryRunnerTestHelper.dataSource)
-        .setQuerySegmentSpec(
-            new MultipleSpecificSegmentSpec(
-                ImmutableList.of(
-                    descriptor_26_28_0,
-                    descriptor_28_29_0,
-                    descriptor_26_28_1,
-                    descriptor_28_29_1
-                )))
-        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
-        .setAggregatorSpecs(QueryRunnerTestHelper.rowsCount, new 
LongSumAggregatorFactory("idx", "index"))
-        .setGranularity(QueryRunnerTestHelper.dayGran)
-        .build();
-
-    final Map<Interval, QueryRunner> runnerMap = ImmutableMap.of(
-        interval_26_28,
-        QueryRunnerTestHelper.makeQueryRunner(
-            factory,
-            "druid.sample.numeric.tsv.top",
-            null
-        ),
-        interval_28_29,
-        QueryRunnerTestHelper.makeQueryRunner(
-            factory,
-            "druid.sample.numeric.tsv.bottom",
-            null
-        )
-    );
-    plumber.setRunners(runnerMap);
-    plumber2.setRunners(runnerMap);
-
-    Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(
-        factory,
-        query.getQuerySegmentSpec().lookup(query, realtimeManager3),
-        query
-    );
-    TestHelper.assertExpectedObjects(expectedResults_both_partitions, results, 
"multi-segmentSpec");
-
-    results = GroupByQueryRunnerTestHelper.runQuery(
-        factory,
-        realtimeManager3.getQueryRunnerForSegments(
-            query,
-            ImmutableList.of(
-                descriptor_26_28_0)
-        ),
-        query
-    );
-    TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, 
results, "multi-segmentSpec");
-
-    results = GroupByQueryRunnerTestHelper.runQuery(
-        factory,
-        realtimeManager3.getQueryRunnerForSegments(
-            query,
-            ImmutableList.of(
-                descriptor_28_29_0)
-        ),
-        query
-    );
-    TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, 
results, "multi-segmentSpec");
-
-    results = GroupByQueryRunnerTestHelper.runQuery(
-        factory,
-        realtimeManager3.getQueryRunnerForSegments(
-            query,
-            ImmutableList.of(
-                descriptor_26_28_1)
-        ),
-        query
-    );
-    TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, 
results, "multi-segmentSpec");
-
-    results = GroupByQueryRunnerTestHelper.runQuery(
-        factory,
-        realtimeManager3.getQueryRunnerForSegments(
-            query,
-            ImmutableList.of(
-                descriptor_28_29_1)
-        ),
-        query
-    );
-    TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, 
results, "multi-segmentSpec");
-
-  }
-
-  private static Pair<GroupByQueryRunnerFactory, Closer> initFactory()
-  {
-    final GroupByQueryConfig config = new GroupByQueryConfig();
-    config.setMaxIntermediateRows(10000);
-    return GroupByQueryRunnerTest.makeQueryRunnerFactory(config);
-  }
-
-  private static TestInputRowHolder makeRow(final long timestamp)
-  {
-    return new TestInputRowHolder(timestamp, null);
-  }
-
-  private static TestInputRowHolder makeRow(final RuntimeException e)
-  {
-    return new TestInputRowHolder(0, e);
-  }
-
-  private static class TestInputRowHolder
-  {
-    private long timestamp;
-    private RuntimeException exception;
-
-    public TestInputRowHolder(long timestamp, RuntimeException exception)
-    {
-      this.timestamp = timestamp;
-      this.exception = exception;
-    }
-
-    public InputRow getRow()
-    {
-      if (exception != null) {
-        throw exception;
-      }
-
-      return new InputRow()
-      {
-        @Override
-        public List<String> getDimensions()
-        {
-          return Collections.singletonList("testDim");
-        }
-
-        @Override
-        public long getTimestampFromEpoch()
-        {
-          return timestamp;
-        }
-
-        @Override
-        public DateTime getTimestamp()
-        {
-          return DateTimes.utc(timestamp);
-        }
-
-        @Override
-        public List<String> getDimension(String dimension)
-        {
-          return new ArrayList<>();
-        }
-
-        @Override
-        public Number getMetric(String metric)
-        {
-          return 0;
-        }
-
-        @Override
-        public Object getRaw(String dimension)
-        {
-          return null;
-        }
-
-        @Override
-        public int compareTo(Row o)
-        {
-          return 0;
-        }
-      };
-    }
-  }
-
-  private static class TestFirehose implements Firehose
-  {
-    private final Iterator<TestInputRowHolder> rows;
-    private boolean closed;
-
-    private TestFirehose(Iterator<TestInputRowHolder> rows)
-    {
-      this.rows = rows;
-    }
-
-    @Override
-    public boolean hasMore()
-    {
-      return rows.hasNext();
-    }
-
-    @Nullable
-    @Override
-    public InputRow nextRow()
-    {
-      final TestInputRowHolder holder = rows.next();
-      if (holder == null) {
-        return null;
-      } else {
-        return holder.getRow();
-      }
-    }
-
-    @Override
-    public Runnable commit()
-    {
-      return Runnables.getNoopRunnable();
-    }
-
-    public boolean isClosed()
-    {
-      return closed;
-    }
-
-    @Override
-    public void close()
-    {
-      closed = true;
-    }
-  }
-
-  private static class TestFirehoseV2 implements FirehoseV2
-  {
-    private final Iterator<TestInputRowHolder> rows;
-    private InputRow currRow;
-    private boolean stop;
-    private boolean closed;
-
-    private TestFirehoseV2(Iterator<TestInputRowHolder> rows)
-    {
-      this.rows = rows;
-    }
-
-    private void nextMessage()
-    {
-      currRow = null;
-      while (currRow == null) {
-        final TestInputRowHolder holder = rows.next();
-        currRow = holder == null ? null : holder.getRow();
-      }
-    }
-
-    @Override
-    public void close()
-    {
-      closed = true;
-    }
-
-    public boolean isClosed()
-    {
-      return closed;
-    }
-
-    @Override
-    public boolean advance()
-    {
-      stop = !rows.hasNext();
-      if (stop) {
-        return false;
-      }
-
-      nextMessage();
-      return true;
-    }
-
-    @Override
-    public InputRow currRow()
-    {
-      return currRow;
-    }
-
-    @Override
-    public Committer makeCommitter()
-    {
-      return new Committer()
-      {
-        @Override
-        public Object getMetadata()
-        {
-          return null;
-        }
-
-        @Override
-        public void run()
-        {
-        }
-      };
-    }
-
-    @Override
-    public void start()
-    {
-      nextMessage();
-    }
-  }
-
-  private static class SleepingFirehose implements Firehose
-  {
-    private boolean closed;
-
-    @Override
-    public boolean hasMore()
-    {
-      try {
-        Thread.sleep(1000);
-      }
-      catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-      return true;
-    }
-
-    @Nullable
-    @Override
-    public InputRow nextRow()
-    {
-      return null;
-    }
-
-    @Override
-    public Runnable commit()
-    {
-      return null;
-    }
-
-    public boolean isClosed()
-    {
-      return closed;
-    }
-
-    @Override
-    public void close()
-    {
-      closed = true;
-    }
-  }
-
-  private static class TestPlumber implements Plumber
-  {
-    private final Sink sink;
-
-
-    private volatile boolean startedJob = false;
-    private volatile boolean finishedJob = false;
-    private volatile int persistCount = 0;
-
-    private Map<Interval, QueryRunner> runners;
-
-    private TestPlumber(Sink sink)
-    {
-      this.sink = sink;
-    }
-
-    private boolean isStartedJob()
-    {
-      return startedJob;
-    }
-
-    private boolean isFinishedJob()
-    {
-      return finishedJob;
-    }
-
-    private int getPersistCount()
-    {
-      return persistCount;
-    }
-
-    @Override
-    public Object startJob()
-    {
-      startedJob = true;
-      return null;
-    }
-
-    @Override
-    public IncrementalIndexAddResult add(InputRow row, Supplier<Committer> 
committerSupplier)
-        throws IndexSizeExceededException
-    {
-      if (row == null) {
-        return Plumber.THROWAWAY;
-      }
-
-      Sink sink = getSink(row.getTimestampFromEpoch());
-
-      if (sink == null) {
-        return Plumber.THROWAWAY;
-      }
-
-      return sink.add(row, false);
-    }
-
-    public Sink getSink(long timestamp)
-    {
-      if (sink.getInterval().contains(timestamp)) {
-        return sink;
-      }
-      return null;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <T> QueryRunner<T> getQueryRunner(final Query<T> query)
-    {
-      if (runners == null) {
-        throw new UnsupportedOperationException();
-      }
-
-      final BaseQuery baseQuery = (BaseQuery) query;
-
-      if (baseQuery.getQuerySegmentSpec() instanceof 
MultipleIntervalSegmentSpec) {
-        return factory.getToolchest()
-                      .mergeResults(
-                          factory.mergeRunners(
-                              Execs.directExecutor(),
-                              Iterables.transform(
-                                  baseQuery.getIntervals(),
-                                  new Function<Interval, QueryRunner<T>>()
-                                  {
-                                    @Override
-                                    public QueryRunner<T> apply(Interval input)
-                                    {
-                                      return runners.get(input);
-                                    }
-                                  }
-                              )
-                          )
-                      );
-      }
-
-      Assert.assertEquals(1, query.getIntervals().size());
-
-      final SegmentDescriptor descriptor =
-          ((SpecificSegmentSpec) ((BaseQuery) 
query).getQuerySegmentSpec()).getDescriptor();
-
-      return new SpecificSegmentQueryRunner<T>(
-          runners.get(descriptor.getInterval()),
-          new SpecificSegmentSpec(descriptor)
-      );
-    }
-
-    @Override
-    public void persist(Committer committer)
-    {
-      persistCount++;
-    }
-
-    @Override
-    public void finishJob()
-    {
-      finishedJob = true;
-    }
-
-    public void setRunners(Map<Interval, QueryRunner> runners)
-    {
-      this.runners = runners;
-    }
-  }
-
-}
diff --git 
a/server/src/test/java/org/apache/druid/realtime/firehose/CombiningFirehoseFactoryTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java
similarity index 97%
rename from 
server/src/test/java/org/apache/druid/realtime/firehose/CombiningFirehoseFactoryTest.java
rename to 
server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java
index c81d309..c08111d 100644
--- 
a/server/src/test/java/org/apache/druid/realtime/firehose/CombiningFirehoseFactoryTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/CombiningFirehoseFactoryTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.realtime.firehose;
+package org.apache.druid.segment.realtime.firehose;
 
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.FirehoseFactory;
@@ -26,7 +26,6 @@ import org.apache.druid.data.input.Row;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
 import org.apache.druid.utils.Runnables;
 import org.joda.time.DateTime;
 import org.junit.Assert;
diff --git a/services/src/main/java/org/apache/druid/cli/CliRealtime.java 
b/services/src/main/java/org/apache/druid/cli/CliRealtime.java
deleted file mode 100644
index 3a5ee57..0000000
--- a/services/src/main/java/org/apache/druid/cli/CliRealtime.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.cli;
-
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import com.google.inject.Module;
-import com.google.inject.name.Names;
-import io.airlift.airline.Command;
-import org.apache.druid.guice.DruidProcessingModule;
-import org.apache.druid.guice.QueryRunnerFactoryModule;
-import org.apache.druid.guice.QueryableModule;
-import org.apache.druid.guice.RealtimeModule;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.lookup.LookupModule;
-import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- */
-@Command(
-    name = "realtime",
-    description = "Runs a realtime node, see 
https://druid.apache.org/docs/latest/Realtime.html for a description"
-)
-public class CliRealtime extends ServerRunnable
-{
-  private static final Logger log = new Logger(CliRealtime.class);
-
-  @Inject
-  private Properties properties;
-
-  public CliRealtime()
-  {
-    super(log);
-  }
-
-  @Override
-  protected List<? extends Module> getModules()
-  {
-    return ImmutableList.of(
-        new DruidProcessingModule(),
-        new QueryableModule(),
-        new QueryRunnerFactoryModule(),
-        new RealtimeModule(),
-        binder -> {
-          
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/realtime");
-          
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8084);
-          
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8284);
-        },
-        new ChatHandlerServerModule(properties),
-        new LookupModule()
-    );
-  }
-}
diff --git 
a/services/src/main/java/org/apache/druid/cli/CliRealtimeExample.java 
b/services/src/main/java/org/apache/druid/cli/CliRealtimeExample.java
deleted file mode 100644
index a4029a7..0000000
--- a/services/src/main/java/org/apache/druid/cli/CliRealtimeExample.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.cli;
-
-import com.google.common.collect.ImmutableList;
-import com.google.inject.Inject;
-import com.google.inject.Module;
-import com.google.inject.name.Names;
-import io.airlift.airline.Command;
-import org.apache.druid.client.DruidServer;
-import org.apache.druid.client.InventoryView;
-import org.apache.druid.client.ServerView;
-import org.apache.druid.guice.DruidProcessingModule;
-import org.apache.druid.guice.LazySingleton;
-import org.apache.druid.guice.QueryRunnerFactoryModule;
-import org.apache.druid.guice.QueryableModule;
-import org.apache.druid.guice.RealtimeModule;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.lookup.LookupModule;
-import org.apache.druid.segment.loading.DataSegmentPusher;
-import org.apache.druid.segment.loading.NoopDataSegmentPusher;
-import org.apache.druid.server.coordination.DataSegmentAnnouncer;
-import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
-import org.apache.druid.server.initialization.jetty.ChatHandlerServerModule;
-import org.apache.druid.timeline.DataSegment;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-
-/**
- */
-@Command(
-    name = "realtime",
-    description = "Runs a standalone realtime node for examples, see 
https://druid.apache.org/docs/latest/Realtime.html for a description"
-)
-public class CliRealtimeExample extends ServerRunnable
-{
-  private static final Logger log = new Logger(CliRealtimeExample.class);
-
-  @Inject
-  private Properties properties;
-
-  public CliRealtimeExample()
-  {
-    super(log);
-  }
-
-  @Override
-  protected List<? extends Module> getModules()
-  {
-    return ImmutableList.of(
-        new DruidProcessingModule(),
-        new QueryableModule(),
-        new QueryRunnerFactoryModule(),
-        new RealtimeModule(),
-        binder -> {
-          
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/realtime");
-          
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8084);
-          
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8284);
-
-          
binder.bind(DataSegmentPusher.class).to(NoopDataSegmentPusher.class).in(LazySingleton.class);
-          
binder.bind(DataSegmentAnnouncer.class).to(NoopDataSegmentAnnouncer.class).in(LazySingleton.class);
-          
binder.bind(InventoryView.class).to(NoopInventoryView.class).in(LazySingleton.class);
-          
binder.bind(ServerView.class).to(NoopServerView.class).in(LazySingleton.class);
-        },
-        new ChatHandlerServerModule(properties),
-        new LookupModule()
-    );
-  }
-
-  private static class NoopServerView implements ServerView
-  {
-    @Override
-    public void registerServerRemovedCallback(Executor exec, 
ServerRemovedCallback callback)
-    {
-      // do nothing
-    }
-
-    @Override
-    public void registerSegmentCallback(Executor exec, SegmentCallback 
callback)
-    {
-      // do nothing
-    }
-  }
-
-  private static class NoopInventoryView implements InventoryView
-  {
-    @Override
-    public DruidServer getInventoryValue(String serverKey)
-    {
-      return null;
-    }
-
-    @Override
-    public Collection<DruidServer> getInventory()
-    {
-      return ImmutableList.of();
-    }
-
-    @Override
-    public boolean isStarted()
-    {
-      return true;
-    }
-
-    @Override
-    public boolean isSegmentLoadedByServer(String serverKey, DataSegment 
segment)
-    {
-      return false;
-    }
-  }
-}
diff --git a/services/src/main/java/org/apache/druid/cli/Main.java 
b/services/src/main/java/org/apache/druid/cli/Main.java
index 34f00a0..0cc4427 100644
--- a/services/src/main/java/org/apache/druid/cli/Main.java
+++ b/services/src/main/java/org/apache/druid/cli/Main.java
@@ -59,7 +59,6 @@ public class Main
         CliCoordinator.class,
         CliHistorical.class,
         CliBroker.class,
-        CliRealtime.class,
         CliOverlord.class,
         CliMiddleManager.class,
         CliRouter.class
@@ -69,11 +68,6 @@ public class Main
            .withDefaultCommand(Help.class)
            .withCommands(serverCommands);
 
-    builder.withGroup("example")
-           .withDescription("Run an example")
-           .withDefaultCommand(Help.class)
-           .withCommands(CliRealtimeExample.class);
-
     List<Class<? extends Runnable>> toolCommands = Arrays.asList(
         DruidJsonValidator.class,
         PullDependencies.class,
diff --git a/services/src/main/java/org/apache/druid/guice/RealtimeModule.java 
b/services/src/main/java/org/apache/druid/guice/RealtimeModule.java
deleted file mode 100644
index 5285c5c..0000000
--- a/services/src/main/java/org/apache/druid/guice/RealtimeModule.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.guice;
-
-import com.google.inject.Binder;
-import com.google.inject.Key;
-import com.google.inject.Module;
-import com.google.inject.TypeLiteral;
-import com.google.inject.multibindings.MapBinder;
-import org.apache.druid.cli.QueryJettyServerInitializer;
-import org.apache.druid.client.cache.CacheConfig;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
-import org.apache.druid.metadata.MetadataSegmentPublisher;
-import org.apache.druid.query.QuerySegmentWalker;
-import org.apache.druid.segment.realtime.FireDepartment;
-import org.apache.druid.segment.realtime.NoopSegmentPublisher;
-import org.apache.druid.segment.realtime.RealtimeManager;
-import org.apache.druid.segment.realtime.SegmentPublisher;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
-import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
-import 
org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
-import 
org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
-import 
org.apache.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
-import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
-import org.apache.druid.server.QueryResource;
-import org.apache.druid.server.SegmentManager;
-import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.coordination.ZkCoordinator;
-import org.apache.druid.server.http.SegmentListerResource;
-import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
-import org.apache.druid.server.metrics.QueryCountStatsProvider;
-import org.eclipse.jetty.server.Server;
-
-import java.util.List;
-
-/**
- */
-public class RealtimeModule implements Module
-{
-
-  @Override
-  public void configure(Binder binder)
-  {
-    PolyBind.createChoiceWithDefault(binder, "druid.publish.type", 
Key.get(SegmentPublisher.class), "metadata");
-    final MapBinder<String, SegmentPublisher> publisherBinder = 
PolyBind.optionBinder(
-        binder,
-        Key.get(SegmentPublisher.class)
-    );
-    
publisherBinder.addBinding("noop").to(NoopSegmentPublisher.class).in(LazySingleton.class);
-    
publisherBinder.addBinding("metadata").to(MetadataSegmentPublisher.class).in(LazySingleton.class);
-
-    PolyBind.createChoice(
-        binder,
-        "druid.realtime.rowIngestionMeters.type",
-        Key.get(RowIngestionMetersFactory.class),
-        Key.get(DropwizardRowIngestionMetersFactory.class)
-    );
-    final MapBinder<String, RowIngestionMetersFactory> 
rowIngestionMetersHandlerProviderBinder =
-        PolyBind.optionBinder(binder, 
Key.get(RowIngestionMetersFactory.class));
-    rowIngestionMetersHandlerProviderBinder
-        .addBinding("dropwizard")
-        .to(DropwizardRowIngestionMetersFactory.class)
-        .in(LazySingleton.class);
-    
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
-
-    PolyBind.createChoice(
-        binder,
-        "druid.realtime.chathandler.type",
-        Key.get(ChatHandlerProvider.class),
-        Key.get(ServiceAnnouncingChatHandlerProvider.class)
-    );
-    final MapBinder<String, ChatHandlerProvider> handlerProviderBinder =
-        PolyBind.optionBinder(binder, Key.get(ChatHandlerProvider.class));
-    handlerProviderBinder
-        .addBinding("announce")
-        .to(ServiceAnnouncingChatHandlerProvider.class)
-        .in(LazySingleton.class);
-    handlerProviderBinder
-        .addBinding("noop")
-        .to(NoopChatHandlerProvider.class)
-        .in(LazySingleton.class);
-
-    JsonConfigProvider.bind(binder, "druid.realtime", 
RealtimeManagerConfig.class);
-    binder.bind(
-        new TypeLiteral<List<FireDepartment>>()
-        {
-        }
-    )
-          .toProvider(FireDepartmentsProvider.class)
-          .in(LazySingleton.class);
-
-    JsonConfigProvider.bind(binder, "druid.segment.handoff", 
CoordinatorBasedSegmentHandoffNotifierConfig.class);
-    binder.bind(SegmentHandoffNotifierFactory.class)
-          .to(CoordinatorBasedSegmentHandoffNotifierFactory.class)
-          .in(LazySingleton.class);
-    binder.bind(CoordinatorClient.class).in(LazySingleton.class);
-
-    JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
-    binder.install(new CacheModule());
-
-    
binder.bind(QuerySegmentWalker.class).to(RealtimeManager.class).in(ManageLifecycle.class);
-    binder.bind(NodeTypeConfig.class).toInstance(new 
NodeTypeConfig(ServerType.REALTIME));
-    
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
-    
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class);
-    Jerseys.addResource(binder, QueryResource.class);
-    Jerseys.addResource(binder, SegmentListerResource.class);
-    LifecycleModule.register(binder, QueryResource.class);
-    LifecycleModule.register(binder, Server.class);
-
-    binder.bind(SegmentManager.class).in(LazySingleton.class);
-    binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
-    LifecycleModule.register(binder, ZkCoordinator.class);
-  }
-}
diff --git a/services/src/test/java/org/apache/druid/cli/MainTest.java 
b/services/src/test/java/org/apache/druid/cli/MainTest.java
index 1a35b66..3e960f6 100644
--- a/services/src/test/java/org/apache/druid/cli/MainTest.java
+++ b/services/src/test/java/org/apache/druid/cli/MainTest.java
@@ -50,8 +50,6 @@ public class MainTest
         //new Object[]{new CliInternalHadoopIndexer()},
 
         new Object[]{new CliMiddleManager()},
-        new Object[]{new CliRealtime()},
-        new Object[]{new CliRealtimeExample()},
         new Object[]{new CliRouter()}
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to