This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a0a667  Introduce SystemSchema tables (#5989) (#6094)
3a0a667 is described below

commit 3a0a667fe03d03d97c4e5d6cbeb3e60e28c7d4f7
Author: Surekha <[email protected]>
AuthorDate: Wed Oct 10 17:17:29 2018 -0700

    Introduce SystemSchema tables (#5989) (#6094)
    
    * Added SystemSchema with following tables (#5989)
    
    * SEGMENTS table provides details on served and published segments
    * SERVERS table provides details on data servers
    * SERVERSEGMETS table is the JOIN of SEGMENTS and SERVERS
    * TASKS table provides details on tasks
    
    * Add documentation for system schema
    
    * Fix static-analysis warnings
    
    * Address PR comments
    
    *Add unit tests
    
    * Fix a test
    
    * Try to fix a test
    
    * Fix a bug around replica count
    
    * rename io.druid to org.apache.druid
    
    * Major change is to make tasks and segment queries streaming
    
    * Made tasks/segments stream to calcite instead of storing it in memory
    * Add num_rows to segments table
    * Refactor JsonParserIterator
    * Replace with closeable iterator
    
    * Fix docs, make num_rows column nullable, some unit test changes
    
    * make num_rows column type long, allow it to be null
    
    fix a compile error after merge, add TrafficCop param to 
InputStreamResponseHandler
    
    * Filter null rows for segments table from Linq4j enumerable
    
    * change num_replicas datatype to long in segments table
    
    * Fix some tests and address comments
    
    * Doc updates, other PR comments
    
    * Update tests
    
    * Address comments
    
    * Add auth check
    * Update docs
    * Refactoring
    
    * Fix teamcity warning, change the getQueryableServer in TimelineServerView
    
    * Fix compilation after rebase
    
    * Use the stream API from AuthorizationUtils
    
    * Added LeaderClient interface and NoopDruidLeaderClient class
    
    * Revert "Added LeaderClient interface and NoopDruidLeaderClient class"
    
    This reverts commit 100fa46e396ab0f68da6c4bef80951f6b996657e.
    
    * Make the naming consistent to server_segments for the join table
    
    * Add ForbiddenException on auth check failure
    * Remove static block from SystemSchema
    
    * Try to fix a test in CalciteQueryTest due to rename of server_segments
    
    * Fix the json output format in the coordinator API
    
    * Add auth check in the segments API
    * Add null check to avoid NPE
    
    * Use annonymous class object instead of mock for DruidLeaderClient in 
SqlBenchmark
    
    * Fix test failures, type long/BIGINT can be nullable
    
    * Revert long nullability to fix tests
    
    * Fix style for tests
    
    * PR comments
    
    * Address PR comments
    
    * Add the missing BytesAccumulatingResponseHandler class
    
    * Use Sequences.withBaggage in DruidPlanner
    
    * Fix docs, add comments
    
    * Close the iterator if hasNext returns false
---
 .../apache/druid/benchmark/query/SqlBenchmark.java |   8 +-
 docs/content/querying/sql.md                       | 100 ++++
 .../histogram/sql/QuantileSqlAggregatorTest.java   |   3 +
 .../org/apache/druid/client/BrokerServerView.java  |  10 +
 .../org/apache/druid/client/DirectDruidClient.java | 119 +---
 .../apache/druid/client/ImmutableDruidServer.java  |  10 +
 .../apache/druid/client/JsonParserIterator.java    | 160 +++++
 .../apache/druid/client/TimelineServerView.java    |   6 +
 .../apache/druid/discovery/DruidLeaderClient.java  |  13 +
 .../coordination/ChangeRequestHttpSyncer.java      |  27 +-
 .../BytesAccumulatingResponseHandler.java          |  52 ++
 .../server/coordinator/HttpLoadQueuePeon.java      |  26 +-
 .../apache/druid/server/http/MetadataResource.java |  44 +-
 .../CachingClusteredClientFunctionalityTest.java   |   8 +
 .../druid/client/CachingClusteredClientTest.java   |   6 +
 .../apache/druid/sql/calcite/planner/Calcites.java |   9 +-
 .../druid/sql/calcite/planner/DruidPlanner.java    |  65 +-
 .../druid/sql/calcite/planner/PlannerFactory.java  |  10 +-
 .../druid/sql/calcite/schema/DruidSchema.java      |  88 ++-
 .../sql/calcite/schema/SegmentMetadataHolder.java  | 134 +++++
 .../druid/sql/calcite/schema/SystemSchema.java     | 658 +++++++++++++++++++++
 .../druid/sql/avatica/DruidAvaticaHandlerTest.java |   5 +
 .../druid/sql/avatica/DruidStatementTest.java      |   3 +
 .../apache/druid/sql/calcite/CalciteQueryTest.java |  16 +-
 .../druid/sql/calcite/http/SqlResourceTest.java    |   3 +
 .../druid/sql/calcite/schema/SystemSchemaTest.java | 632 ++++++++++++++++++++
 .../druid/sql/calcite/util/CalciteTests.java       |  33 ++
 .../sql/calcite/util/TestServerInventoryView.java  |  11 +-
 28 files changed, 2054 insertions(+), 205 deletions(-)

diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
index 3fde019..7971b54 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java
@@ -44,6 +44,8 @@ import org.apache.druid.sql.calcite.planner.DruidPlanner;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
 import org.apache.druid.sql.calcite.planner.PlannerResult;
+import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.SystemSchema;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
 import org.apache.druid.timeline.DataSegment;
@@ -111,10 +113,12 @@ public class SqlBenchmark
         .createQueryRunnerFactoryConglomerate();
     final QueryRunnerFactoryConglomerate conglomerate = 
conglomerateCloserPair.lhs;
     final PlannerConfig plannerConfig = new PlannerConfig();
-
+    final DruidSchema druidSchema = 
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
+    final SystemSchema systemSchema = 
CalciteTests.createMockSystemSchema(druidSchema, walker);
     this.walker = new 
SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index);
     plannerFactory = new PlannerFactory(
-        CalciteTests.createMockSchema(conglomerate, walker, plannerConfig),
+        druidSchema,
+        systemSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         CalciteTests.createOperatorTable(),
         CalciteTests.createExprMacroTable(),
diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md
index 0130d6f..e604a1c 100644
--- a/docs/content/querying/sql.md
+++ b/docs/content/querying/sql.md
@@ -477,6 +477,11 @@ plan SQL queries. This metadata is cached on broker 
startup and also updated per
 [SegmentMetadata queries](segmentmetadataquery.html). Background metadata 
refreshing is triggered by
 segments entering and exiting the cluster, and can also be throttled through 
configuration.
 
+Druid exposes system information through special system tables. There are two 
such schemas available: Information Schema and Sys Schema.
+Information schema provides details about table and column types. The "sys" 
schema provides information about Druid internals like segments/tasks/servers.
+
+## INFORMATION SCHEMA
+
 You can access table and column metadata through JDBC using 
`connection.getMetaData()`, or through the
 INFORMATION_SCHEMA tables described below. For example, to retrieve metadata 
for the Druid
 datasource "foo", use the query:
@@ -528,6 +533,101 @@ SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE 
TABLE_SCHEMA = 'druid' AND TABLE_
 |COLLATION_NAME||
 |JDBC_TYPE|Type code from java.sql.Types (Druid extension)|
 
+## SYSTEM SCHEMA
+
+The "sys" schema provides visibility into Druid segments, servers and tasks.
+For example to retrieve all segments for datasource "wikipedia", use the query:
+```sql
+SELECT * FROM sys.segments WHERE datasource = 'wikipedia'
+```
+
+### SEGMENTS table
+Segments table provides details on all Druid segments, whether they are 
published yet or not.
+
+
+|Column|Notes|
+|------|-----|
+|segment_id|Unique segment identifier|
+|datasource|Name of datasource|
+|start|Interval start time (in ISO 8601 format)|
+|end|Interval end time (in ISO 8601 format)|
+|size|Size of segment in bytes|
+|version|Version string (generally an ISO8601 timestamp corresponding to when 
the segment set was first started). Higher version means the more recently 
created segment. Version comparing is based on string comparison.|
+|partition_num|Partition number (an integer, unique within a 
datasource+interval+version; may not necessarily be contiguous)|
+|num_replicas|Number of replicas of this segment currently being served|
+|num_rows|Number of rows in current segment, this value could be null if 
unkown to broker at query time|
+|is_published|Boolean is represented as long type where 1 = true, 0 = false. 1 
represents this segment has been published to the metadata store|
+|is_available|Boolean is represented as long type where 1 = true, 0 = false. 1 
if this segment is currently being served by any server(historical or realtime)|
+|is_realtime|Boolean is represented as long type where 1 = true, 0 = false. 1 
if this segment is being served on any type of realtime tasks|
+|payload|JSON-serialized data segment payload|
+
+### SERVERS table
+Servers table lists all data servers(any server that hosts a segment). It 
includes both historicals and peons.
+
+|Column|Notes|
+|------|-----|
+|server|Server name in the form host:port|
+|host|Hostname of the server|
+|plaintext_port|Unsecured port of the server, or -1 if plaintext traffic is 
disabled|
+|tls_port|TLS port of the server, or -1 if TLS is disabled|
+|server_type|Type of Druid service. Possible values include: historical, 
realtime and indexer_executor(peon).|
+|tier|Distribution tier see 
[druid.server.tier](#../configuration/index.html#Historical-General-Configuration)|
+|current_size|Current size of segments in bytes on this server|
+|max_size|Max size in bytes this server recommends to assign to segments see 
[druid.server.maxSize](#../configuration/index.html#Historical-General-Configuration)|
+
+To retrieve information about all servers, use the query:
+```sql
+SELECT * FROM sys.servers;
+```
+
+### SERVER_SEGMENTS table
+
+SERVER_SEGMENTS is used to join servers with segments table
+
+|Column|Notes|
+|------|-----|
+|server|Server name in format host:port (Primary key of [servers 
table](#SERVERS-table))|
+|segment_id|Segment identifier (Primary key of [segments 
table](#SEGMENTS-table))|
+
+JOIN between "servers" and "segments" can be used to query the number of 
segments for a specific datasource, 
+grouped by server, example query:
+```sql
+SELECT count(segments.segment_id) as num_segments from sys.segments as 
segments 
+INNER JOIN sys.server_segments as server_segments 
+ON segments.segment_id  = server_segments.segment_id 
+INNER JOIN sys.servers as servers 
+ON servers.server = server_segments.server
+WHERE segments.datasource = 'wikipedia' 
+GROUP BY servers.server;
+```
+
+### TASKS table
+
+The tasks table provides information about active and recently-completed 
indexing tasks. For more information 
+check out [ingestion tasks](#../ingestion/tasks.html)
+
+|Column|Notes|
+|------|-----|
+|task_id|Unique task identifier|
+|type|Task type, for example this value is "index" for indexing tasks. See 
[tasks-overview](../ingestion/tasks.md)|
+|datasource|Datasource name being indexed|
+|created_time|Timestamp in ISO8601 format corresponding to when the ingestion 
task was created. Note that this value is populated for completed and waiting 
tasks. For running and pending tasks this value is set to 1970-01-01T00:00:00Z|
+|queue_insertion_time|Timestamp in ISO8601 format corresponding to when this 
task was added to the queue on the overlord|
+|status|Status of a task can be RUNNING, FAILED, SUCCESS|
+|runner_status|Runner status of a completed task would be NONE, for 
in-progress tasks this can be RUNNING, WAITING, PENDING|
+|duration|Time it took to finish the task in milliseconds, this value is 
present only for completed tasks|
+|location|Server name where this task is running in the format host:port, this 
information is present only for RUNNING tasks|
+|host|Hostname of the server where task is running|
+|plaintext_port|Unsecured port of the server, or -1 if plaintext traffic is 
disabled|
+|tls_port|TLS port of the server, or -1 if TLS is disabled|
+|error_msg|Detailed error message in case of FAILED tasks|
+
+For example, to retrieve tasks information filtered by status, use the query
+```sql
+SELECT * FROM sys.tasks where status='FAILED';
+```
+
+
 ## Server configuration
 
 The Druid SQL server is configured through the following properties on the 
broker.
diff --git 
a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
 
b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
index 3ae2fbc..4de5060 100644
--- 
a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
+++ 
b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
@@ -60,6 +60,7 @@ import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
 import org.apache.druid.sql.calcite.planner.PlannerResult;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.SystemSchema;
 import org.apache.druid.sql.calcite.util.CalciteTestBase;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.QueryLogHook;
@@ -150,6 +151,7 @@ public class QuantileSqlAggregatorTest extends 
CalciteTestBase
 
     final PlannerConfig plannerConfig = new PlannerConfig();
     final DruidSchema druidSchema = 
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
+    final SystemSchema systemSchema = 
CalciteTests.createMockSystemSchema(druidSchema, walker);
     final DruidOperatorTable operatorTable = new DruidOperatorTable(
         ImmutableSet.of(new QuantileSqlAggregator()),
         ImmutableSet.of()
@@ -157,6 +159,7 @@ public class QuantileSqlAggregatorTest extends 
CalciteTestBase
 
     plannerFactory = new PlannerFactory(
         druidSchema,
+        systemSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         operatorTable,
         CalciteTests.createExprMacroTable(),
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java 
b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index cbfd717..f4e874d 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -45,12 +45,14 @@ import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
 
 import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  */
@@ -322,4 +324,12 @@ public class BrokerServerView implements TimelineServerView
       );
     }
   }
+
+  @Override
+  public List<ImmutableDruidServer> getDruidServers()
+  {
+    return clients.values().stream()
+                  .map(queryableDruidServer -> 
queryableDruidServer.getServer().toImmutableDruidServer())
+                  .collect(Collectors.toList());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java 
b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index 39396b8..da2fb03 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -19,9 +19,6 @@
 
 package org.apache.druid.client;
 
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.fasterxml.jackson.core.ObjectCodec;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.type.TypeFactory;
@@ -32,7 +29,6 @@ import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -52,14 +48,12 @@ import 
org.apache.druid.java.util.http.client.response.StatusResponseHolder;
 import org.apache.druid.query.BySegmentResultValueClass;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContexts;
-import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryMetrics;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.QueryWatcher;
-import org.apache.druid.query.ResourceLimitExceededException;
 import org.apache.druid.query.Result;
 import org.apache.druid.query.aggregation.MetricManipulatorFns;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -71,20 +65,16 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.joda.time.Duration;
 
 import javax.ws.rs.core.MediaType;
-import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.SequenceInputStream;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.Enumeration;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -550,7 +540,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
           @Override
           public JsonParserIterator<T> make()
           {
-            return new JsonParserIterator<T>(typeRef, future, url, query);
+            return new JsonParserIterator<T>(typeRef, future, url, query, 
host, objectMapper);
           }
 
           @Override
@@ -576,113 +566,6 @@ public class DirectDruidClient<T> implements 
QueryRunner<T>
     return retVal;
   }
 
-  private class JsonParserIterator<T> implements Iterator<T>, Closeable
-  {
-    private JsonParser jp;
-    private ObjectCodec objectCodec;
-    private final JavaType typeRef;
-    private final Future<InputStream> future;
-    private final Query<T> query;
-    private final String url;
-
-    public JsonParserIterator(JavaType typeRef, Future<InputStream> future, 
String url, Query<T> query)
-    {
-      this.typeRef = typeRef;
-      this.future = future;
-      this.url = url;
-      this.query = query;
-      jp = null;
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-      init();
-
-      if (jp.isClosed()) {
-        return false;
-      }
-      if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
-        CloseQuietly.close(jp);
-        return false;
-      }
-
-      return true;
-    }
-
-    @Override
-    public T next()
-    {
-      init();
-
-      try {
-        final T retVal = objectCodec.readValue(jp, typeRef);
-        jp.nextToken();
-        return retVal;
-      }
-      catch (IOException e) {
-        throw Throwables.propagate(e);
-      }
-    }
-
-    @Override
-    public void remove()
-    {
-      throw new UnsupportedOperationException();
-    }
-
-    private void init()
-    {
-      if (jp == null) {
-        try {
-          InputStream is = future.get();
-          if (is == null) {
-            throw new QueryInterruptedException(
-                new ResourceLimitExceededException(
-                    "query[%s] url[%s] timed out or max bytes limit reached.",
-                    query.getId(),
-                    url
-                ),
-                host
-            );
-          } else {
-            jp = objectMapper.getFactory().createParser(is);
-          }
-          final JsonToken nextToken = jp.nextToken();
-          if (nextToken == JsonToken.START_OBJECT) {
-            QueryInterruptedException cause = jp.getCodec().readValue(jp, 
QueryInterruptedException.class);
-            throw new QueryInterruptedException(cause, host);
-          } else if (nextToken != JsonToken.START_ARRAY) {
-            throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url 
[%s]", jp.getCurrentToken(), url);
-          } else {
-            jp.nextToken();
-            objectCodec = jp.getCodec();
-          }
-        }
-        catch (IOException | InterruptedException | ExecutionException e) {
-          throw new RE(
-              e,
-              "Failure getting results for query[%s] url[%s] because of [%s]",
-              query.getId(),
-              url,
-              e.getMessage()
-          );
-        }
-        catch (CancellationException e) {
-          throw new QueryInterruptedException(e, host);
-        }
-      }
-    }
-
-    @Override
-    public void close() throws IOException
-    {
-      if (jp != null) {
-        jp.close();
-      }
-    }
-  }
-
   @Override
   public String toString()
   {
diff --git 
a/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java 
b/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
index 5426d79..280ed38 100644
--- a/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
+++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
@@ -65,6 +65,16 @@ public class ImmutableDruidServer
     return metadata.getHost();
   }
 
+  public String getHostAndPort()
+  {
+    return metadata.getHostAndPort();
+  }
+
+  public String getHostAndTlsPort()
+  {
+    return metadata.getHostAndTlsPort();
+  }
+
   public long getCurrSize()
   {
     return currSize;
diff --git 
a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java 
b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
new file mode 100644
index 0000000..18535c1
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.ObjectCodec;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Throwables;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.guava.CloseQuietly;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.ResourceLimitExceededException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class JsonParserIterator<T> implements Iterator<T>, Closeable
+{
+  private JsonParser jp;
+  private ObjectCodec objectCodec;
+  private final JavaType typeRef;
+  private final Future<InputStream> future;
+  private final Query<T> query;
+  private final String url;
+  private final String host;
+  private final ObjectMapper objectMapper;
+
+  public JsonParserIterator(
+      JavaType typeRef,
+      Future<InputStream> future,
+      String url,
+      Query<T> query,
+      String host,
+      ObjectMapper objectMapper
+  )
+  {
+    this.typeRef = typeRef;
+    this.future = future;
+    this.url = url;
+    this.query = query;
+    jp = null;
+    this.host = host;
+    this.objectMapper = objectMapper;
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    init();
+
+    if (jp.isClosed()) {
+      return false;
+    }
+    if (jp.getCurrentToken() == JsonToken.END_ARRAY) {
+      CloseQuietly.close(jp);
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public T next()
+  {
+    init();
+
+    try {
+      final T retVal = objectCodec.readValue(jp, typeRef);
+      jp.nextToken();
+      return retVal;
+    }
+    catch (IOException e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void remove()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  private void init()
+  {
+    if (jp == null) {
+      try {
+        InputStream is = future.get();
+        if (is == null) {
+          throw new QueryInterruptedException(
+              new ResourceLimitExceededException(
+                  "query[%s] url[%s] timed out or max bytes limit reached.",
+                  query.getId(),
+                  url
+              ),
+              host
+          );
+        } else {
+          jp = objectMapper.getFactory().createParser(is);
+        }
+        final JsonToken nextToken = jp.nextToken();
+        if (nextToken == JsonToken.START_OBJECT) {
+          QueryInterruptedException cause = jp.getCodec().readValue(jp, 
QueryInterruptedException.class);
+          throw new QueryInterruptedException(cause, host);
+        } else if (nextToken != JsonToken.START_ARRAY) {
+          throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url 
[%s]", jp.getCurrentToken(), url);
+        } else {
+          jp.nextToken();
+          objectCodec = jp.getCodec();
+        }
+      }
+      catch (IOException | InterruptedException | ExecutionException e) {
+        throw new RE(
+            e,
+            "Failure getting results for query[%s] url[%s] because of [%s]",
+            query == null ? null : query.getId(),
+            url,
+            e.getMessage()
+        );
+      }
+      catch (CancellationException e) {
+        throw new QueryInterruptedException(e, host);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    if (jp != null) {
+      jp.close();
+    }
+  }
+}
+
diff --git 
a/server/src/main/java/org/apache/druid/client/TimelineServerView.java 
b/server/src/main/java/org/apache/druid/client/TimelineServerView.java
index 6ac5f62..89042c6 100644
--- a/server/src/main/java/org/apache/druid/client/TimelineServerView.java
+++ b/server/src/main/java/org/apache/druid/client/TimelineServerView.java
@@ -27,6 +27,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineLookup;
 
 import javax.annotation.Nullable;
+import java.util.List;
 import java.util.concurrent.Executor;
 
 /**
@@ -36,6 +37,11 @@ public interface TimelineServerView extends ServerView
   @Nullable
   TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource);
 
+  /**
+   * Returns a list of {@link ImmutableDruidServer}
+   */
+  List<ImmutableDruidServer> getDruidServers();
+
   <T> QueryRunner<T> getQueryRunner(DruidServer server);
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java 
b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
index 406703e..72feb9c 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
@@ -21,6 +21,7 @@ package org.apache.druid.discovery;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.client.selector.Server;
 import org.apache.druid.concurrent.LifecycleLock;
 import org.apache.druid.curator.discovery.ServerDiscoverySelector;
@@ -134,6 +135,18 @@ public class DruidLeaderClient
   }
 
   /**
+   * Executes the request object aimed at the leader and process the response 
with given handler
+   * Note: this method doesn't do retrying on errors or handle leader changes 
occurred during communication
+   */
+  public <Intermediate, Final> ListenableFuture<Final> goAsync(
+      final Request request,
+      final HttpResponseHandler<Intermediate, Final> handler
+  )
+  {
+    return httpClient.go(request, handler);
+  }
+
+  /**
    * Executes a Request object aimed at the leader. Throws IOException if the 
leader cannot be located.
    */
   public FullResponseHolder go(
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
 
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
index 1ae2d18..309f0ef 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java
@@ -34,12 +34,9 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
-import 
org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
-import org.apache.druid.java.util.http.client.response.ClientResponse;
-import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.joda.time.Duration;
 
 import javax.servlet.http.HttpServletResponse;
@@ -233,11 +230,11 @@ public class ChangeRequestHttpSyncer<T>
                 }
 
                 try {
-                  if (responseHandler.status == 
HttpServletResponse.SC_NO_CONTENT) {
+                  if (responseHandler.getStatus() == 
HttpServletResponse.SC_NO_CONTENT) {
                     log.debug("Received NO CONTENT from server[%s]", 
logIdentity);
                     lastSuccessfulSyncTime = System.currentTimeMillis();
                     return;
-                  } else if (responseHandler.status != 
HttpServletResponse.SC_OK) {
+                  } else if (responseHandler.getStatus() != 
HttpServletResponse.SC_OK) {
                     handleFailure(new RE("Bad Sync Response."));
                     return;
                   }
@@ -326,8 +323,8 @@ public class ChangeRequestHttpSyncer<T>
               String logMsg = StringUtils.nonStrictFormat(
                   "failed to get sync response from [%s]. Return code [%s], 
Reason: [%s]",
                   logIdentity,
-                  responseHandler.status,
-                  responseHandler.description
+                  responseHandler.getStatus(),
+                  responseHandler.getDescription()
               );
 
               if (incrementFailedAttemptAndCheckUnstabilityTimeout()) {
@@ -420,20 +417,6 @@ public class ChangeRequestHttpSyncer<T>
     return false;
   }
 
-  private static class BytesAccumulatingResponseHandler extends 
InputStreamResponseHandler
-  {
-    private int status;
-    private String description;
-
-    @Override
-    public ClientResponse<AppendableByteArrayInputStream> 
handleResponse(HttpResponse response, TrafficCop trafficCop)
-    {
-      status = response.getStatus().getCode();
-      description = response.getStatus().getReasonPhrase();
-      return ClientResponse.unfinished(super.handleResponse(response, 
trafficCop).getObj());
-    }
-  }
-
   public interface Listener<T>
   {
     void fullSync(List<T> changes);
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/BytesAccumulatingResponseHandler.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/BytesAccumulatingResponseHandler.java
new file mode 100644
index 0000000..c9e89c3
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/BytesAccumulatingResponseHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.druid.server.coordinator;
+
+import 
org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
+import org.apache.druid.java.util.http.client.response.ClientResponse;
+import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+
+/**
+ * An async BytesAccumulatingResponseHandler which returns unfinished response
+ */
+public class BytesAccumulatingResponseHandler extends 
InputStreamResponseHandler
+{
+  private int status;
+  private String description;
+
+  @Override
+  public ClientResponse<AppendableByteArrayInputStream> 
handleResponse(HttpResponse response, TrafficCop trafficCop)
+  {
+    status = response.getStatus().getCode();
+    description = response.getStatus().getReasonPhrase();
+    return ClientResponse.unfinished(super.handleResponse(response, 
trafficCop).getObj());
+  }
+
+  public int getStatus()
+  {
+    return status;
+  }
+
+  public String getDescription()
+  {
+    return description;
+  }
+
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
index e6faec5..4b44980 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
@@ -35,9 +35,6 @@ import 
org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
-import 
org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
-import org.apache.druid.java.util.http.client.response.ClientResponse;
-import 
org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
 import org.apache.druid.server.coordination.DataSegmentChangeCallback;
 import org.apache.druid.server.coordination.DataSegmentChangeHandler;
 import org.apache.druid.server.coordination.DataSegmentChangeRequest;
@@ -47,7 +44,6 @@ import 
org.apache.druid.server.coordination.SegmentLoadDropHandler;
 import org.apache.druid.timeline.DataSegment;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.joda.time.Duration;
 
 import javax.servlet.http.HttpServletResponse;
@@ -207,9 +203,9 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
             {
               boolean scheduleNextRunImmediately = true;
               try {
-                if (responseHandler.status == 
HttpServletResponse.SC_NO_CONTENT) {
+                if (responseHandler.getStatus() == 
HttpServletResponse.SC_NO_CONTENT) {
                   log.debug("Received NO CONTENT reseponse from [%s]", 
serverId);
-                } else if (HttpServletResponse.SC_OK == 
responseHandler.status) {
+                } else if (HttpServletResponse.SC_OK == 
responseHandler.getStatus()) {
                   try {
                     
List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses = 
jsonMapper.readValue(
                         result, RESPONSE_ENTITY_TYPE_REF
@@ -260,7 +256,6 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
             public void onFailure(Throwable t)
             {
               try {
-                responseHandler.description = t.toString();
                 logRequestFailure(t);
               }
               finally {
@@ -274,8 +269,8 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
                   t,
                   "Request[%s] Failed with status[%s]. Reason[%s].",
                   changeRequestURL,
-                  responseHandler.status,
-                  responseHandler.description
+                  responseHandler.getStatus(),
+                  responseHandler.getDescription()
               );
             }
           },
@@ -596,17 +591,4 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
     }
   }
 
-  private static class BytesAccumulatingResponseHandler extends 
InputStreamResponseHandler
-  {
-    private int status;
-    private String description;
-
-    @Override
-    public ClientResponse<AppendableByteArrayInputStream> 
handleResponse(HttpResponse response, TrafficCop trafficCop)
-    {
-      status = response.getStatus().getCode();
-      description = response.getStatus().getReasonPhrase();
-      return ClientResponse.unfinished(super.handleResponse(response, 
trafficCop).getObj());
-    }
-  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java 
b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index 719a290..781842f 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -19,6 +19,9 @@
 
 package org.apache.druid.server.http;
 
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterables;
@@ -26,6 +29,7 @@ import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 import com.sun.jersey.spi.container.ResourceFilters;
 import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.metadata.MetadataSegmentManager;
 import org.apache.druid.server.http.security.DatasourceResourceFilter;
@@ -46,10 +50,12 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  */
@@ -60,19 +66,22 @@ public class MetadataResource
   private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
   private final AuthConfig authConfig;
   private final AuthorizerMapper authorizerMapper;
+  private final ObjectMapper jsonMapper;
 
   @Inject
   public MetadataResource(
       MetadataSegmentManager metadataSegmentManager,
       IndexerMetadataStorageCoordinator metadataStorageCoordinator,
       AuthConfig authConfig,
-      AuthorizerMapper authorizerMapper
+      AuthorizerMapper authorizerMapper,
+      @Json ObjectMapper jsonMapper
   )
   {
     this.metadataSegmentManager = metadataSegmentManager;
     this.metadataStorageCoordinator = metadataStorageCoordinator;
     this.authConfig = authConfig;
     this.authorizerMapper = authorizerMapper;
+    this.jsonMapper = jsonMapper;
   }
 
   @GET
@@ -137,6 +146,39 @@ public class MetadataResource
   }
 
   @GET
+  @Path("/segments")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getDatabaseSegments(@Context final HttpServletRequest req)
+  {
+    final Collection<ImmutableDruidDataSource> druidDataSources = 
metadataSegmentManager.getInventory();
+    final Set<DataSegment> metadataSegments = druidDataSources
+        .stream()
+        .flatMap(t -> t.getSegments().stream())
+        .collect(Collectors.toSet());
+
+    Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> 
Collections.singletonList(
+        
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
+
+    final Iterable<DataSegment> authorizedSegments = 
AuthorizationUtils.filterAuthorizedResources(
+        req, metadataSegments, raGenerator, authorizerMapper);
+
+    final StreamingOutput stream = outputStream -> {
+      final JsonFactory jsonFactory = jsonMapper.getFactory();
+      try (final JsonGenerator jsonGenerator = 
jsonFactory.createGenerator(outputStream)) {
+        jsonGenerator.writeStartArray();
+        for (DataSegment ds : authorizedSegments) {
+          jsonGenerator.writeObject(ds);
+          jsonGenerator.flush();
+        }
+        jsonGenerator.writeEndArray();
+      }
+    };
+
+    Response.ResponseBuilder builder = Response.status(Response.Status.OK);
+    return builder.entity(stream).build();
+  }
+
+  @GET
   @Path("/datasources/{dataSourceName}/segments")
   @Produces(MediaType.APPLICATION_JSON)
   @ResourceFilters(DatasourceResourceFilter.class)
diff --git 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index f02ba43..9cec022 100644
--- 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -59,6 +59,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Comparator;
@@ -251,6 +252,13 @@ public class CachingClusteredClientFunctionalityTest
             return timeline;
           }
 
+          @Nullable
+          @Override
+          public List<ImmutableDruidServer> getDruidServers()
+          {
+            throw new UnsupportedOperationException();
+          }
+
           @Override
           public void registerTimelineCallback(final Executor exec, final 
TimelineCallback callback)
           {
diff --git 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index 83b4aee..573c5c9 100644
--- 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -2640,6 +2640,12 @@ public class CachingClusteredClientTest
           }
 
           @Override
+          public List<ImmutableDruidServer> getDruidServers()
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
           public <T> QueryRunner<T> getQueryRunner(DruidServer server)
           {
             return serverView.getQueryRunner(server);
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
index 484a7ba..3601e15 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/Calcites.java
@@ -27,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlCollation;
 import org.apache.calcite.sql.SqlKind;
@@ -46,6 +45,7 @@ import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
 import org.apache.druid.sql.calcite.schema.InformationSchema;
+import org.apache.druid.sql.calcite.schema.SystemSchema;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Days;
@@ -98,11 +98,16 @@ public class Calcites
     return DEFAULT_CHARSET;
   }
 
-  public static SchemaPlus createRootSchema(final Schema druidSchema, final 
AuthorizerMapper authorizerMapper)
+  public static SchemaPlus createRootSchema(
+      final DruidSchema druidSchema,
+      final SystemSchema systemSchema,
+      final AuthorizerMapper authorizerMapper
+  )
   {
     final SchemaPlus rootSchema = CalciteSchema.createRootSchema(false, 
false).plus();
     rootSchema.add(DruidSchema.NAME, druidSchema);
     rootSchema.add(InformationSchema.NAME, new InformationSchema(rootSchema, 
authorizerMapper));
+    rootSchema.add(SystemSchema.NAME, systemSchema);
     return rootSchema;
   }
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
index a648013..1d1c1f6 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
@@ -32,6 +32,7 @@ import org.apache.calcite.interpreter.BindableConvention;
 import org.apache.calcite.interpreter.BindableRel;
 import org.apache.calcite.interpreter.Bindables;
 import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptUtil;
@@ -51,6 +52,7 @@ import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.calcite.util.Pair;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.BaseSequence;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.server.security.Access;
@@ -66,6 +68,7 @@ import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import java.io.Closeable;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -309,19 +312,65 @@ public class DruidPlanner implements Closeable
     } else {
       final BindableRel theRel = bindableRel;
       final DataContext dataContext = 
plannerContext.createDataContext((JavaTypeFactory) planner.getTypeFactory());
-      final Supplier<Sequence<Object[]>> resultsSupplier = new 
Supplier<Sequence<Object[]>>()
-      {
-        @Override
-        public Sequence<Object[]> get()
-        {
-          final Enumerable enumerable = theRel.bind(dataContext);
-          return Sequences.simple(enumerable);
-        }
+      final Supplier<Sequence<Object[]>> resultsSupplier = () -> {
+        final Enumerable enumerable = theRel.bind(dataContext);
+        final Enumerator enumerator = enumerable.enumerator();
+        return Sequences.withBaggage(new BaseSequence<>(
+            new BaseSequence.IteratorMaker<Object[], 
EnumeratorIterator<Object[]>>()
+            {
+              @Override
+              public EnumeratorIterator make()
+              {
+                return new EnumeratorIterator(new Iterator<Object[]>()
+                {
+                  @Override
+                  public boolean hasNext()
+                  {
+                    return enumerator.moveNext();
+                  }
+
+                  @Override
+                  public Object[] next()
+                  {
+                    return (Object[]) enumerator.current();
+                  }
+                });
+              }
+
+              @Override
+              public void cleanup(EnumeratorIterator iterFromMake)
+              {
+
+              }
+            }
+        ), () -> enumerator.close());
       };
       return new PlannerResult(resultsSupplier, root.validatedRowType);
     }
   }
 
+  private static class EnumeratorIterator<T> implements Iterator<T>
+  {
+    private final Iterator<T> it;
+
+    public EnumeratorIterator(Iterator<T> it)
+    {
+      this.it = it;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      return it.hasNext();
+    }
+
+    @Override
+    public T next()
+    {
+      return it.next();
+    }
+  }
+
   private PlannerResult planExplanation(
       final RelNode rel,
       final SqlExplain explain
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
index 0b99f39..83f90cd 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerFactory.java
@@ -41,6 +41,7 @@ import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.sql.calcite.rel.QueryMaker;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.SystemSchema;
 
 import java.util.Map;
 import java.util.Properties;
@@ -57,6 +58,7 @@ public class PlannerFactory
       .build();
 
   private final DruidSchema druidSchema;
+  private final SystemSchema systemSchema;
   private final QueryLifecycleFactory queryLifecycleFactory;
   private final DruidOperatorTable operatorTable;
   private final ExprMacroTable macroTable;
@@ -67,6 +69,7 @@ public class PlannerFactory
   @Inject
   public PlannerFactory(
       final DruidSchema druidSchema,
+      final SystemSchema systemSchema,
       final QueryLifecycleFactory queryLifecycleFactory,
       final DruidOperatorTable operatorTable,
       final ExprMacroTable macroTable,
@@ -76,6 +79,7 @@ public class PlannerFactory
   )
   {
     this.druidSchema = druidSchema;
+    this.systemSchema = systemSchema;
     this.queryLifecycleFactory = queryLifecycleFactory;
     this.operatorTable = operatorTable;
     this.macroTable = macroTable;
@@ -86,7 +90,11 @@ public class PlannerFactory
 
   public DruidPlanner createPlanner(final Map<String, Object> queryContext)
   {
-    final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema, 
authorizerMapper);
+    final SchemaPlus rootSchema = Calcites.createRootSchema(
+        druidSchema,
+        systemSchema,
+        authorizerMapper
+    );
     final PlannerContext plannerContext = PlannerContext.create(
         operatorTable,
         macroTable,
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 08b54b0..5a64d08 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -51,6 +51,7 @@ import 
org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Escalator;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -91,7 +92,6 @@ public class DruidSchema extends AbstractSchema
   private static final int MAX_SEGMENTS_PER_QUERY = 15000;
 
   private final QueryLifecycleFactory queryLifecycleFactory;
-  private final TimelineServerView serverView;
   private final PlannerConfig config;
   private final ViewManager viewManager;
   private final ExecutorService cacheExec;
@@ -103,9 +103,10 @@ public class DruidSchema extends AbstractSchema
   // Protects access to segmentSignatures, mutableSegments, 
segmentsNeedingRefresh, lastRefresh, isServerViewInitialized
   private final Object lock = new Object();
 
-  // DataSource -> Segment -> RowSignature for that segment.
+  // DataSource -> Segment -> SegmentMetadataHolder(contains RowSignature) for 
that segment.
   // Use TreeMap for segments so they are merged in deterministic order, from 
older to newer.
-  private final Map<String, TreeMap<DataSegment, RowSignature>> 
segmentSignatures = new HashMap<>();
+  // This data structure need to be accessed in a thread-safe way since 
SystemSchema accesses it
+  private final Map<String, TreeMap<DataSegment, SegmentMetadataHolder>> 
segmentMetadataInfo = new HashMap<>();
 
   // All mutable segments.
   private final Set<DataSegment> mutableSegments = new 
TreeSet<>(SEGMENT_ORDER);
@@ -134,7 +135,7 @@ public class DruidSchema extends AbstractSchema
   )
   {
     this.queryLifecycleFactory = 
Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory");
-    this.serverView = Preconditions.checkNotNull(serverView, "serverView");
+    Preconditions.checkNotNull(serverView, "serverView");
     this.config = Preconditions.checkNotNull(config, "config");
     this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager");
     this.cacheExec = ScheduledExecutors.fixed(1, "DruidSchema-Cache-%d");
@@ -320,25 +321,39 @@ public class DruidSchema extends AbstractSchema
   private void addSegment(final DruidServerMetadata server, final DataSegment 
segment)
   {
     synchronized (lock) {
-      final Map<DataSegment, RowSignature> knownSegments = 
segmentSignatures.get(segment.getDataSource());
+      final Map<DataSegment, SegmentMetadataHolder> knownSegments = 
segmentMetadataInfo.get(segment.getDataSource());
       if (knownSegments == null || !knownSegments.containsKey(segment)) {
+        // segmentReplicatable is used to determine if segments are served by 
realtime servers or not
+        final long isRealtime = server.segmentReplicatable() ? 0 : 1;
+        final long isPublished = server.getType() == ServerType.HISTORICAL ? 1 
: 0;
+        SegmentMetadataHolder holder = new 
SegmentMetadataHolder.Builder(isPublished, 1, isRealtime, 1).build();
         // Unknown segment.
-        setSegmentSignature(segment, null);
+        setSegmentSignature(segment, holder);
         segmentsNeedingRefresh.add(segment);
-
         if (!server.segmentReplicatable()) {
           log.debug("Added new mutable segment[%s].", segment.getIdentifier());
           mutableSegments.add(segment);
         } else {
           log.debug("Added new immutable segment[%s].", 
segment.getIdentifier());
         }
-      } else if (server.segmentReplicatable()) {
-        // If a segment shows up on a replicatable (historical) server at any 
point, then it must be immutable,
-        // even if it's also available on non-replicatable (realtime) servers.
-        mutableSegments.remove(segment);
-        log.debug("Segment[%s] has become immutable.", 
segment.getIdentifier());
+      } else {
+        if (knownSegments.containsKey(segment)) {
+          SegmentMetadataHolder holder = knownSegments.get(segment);
+          SegmentMetadataHolder holderWithNumReplicas = new 
SegmentMetadataHolder.Builder(
+              holder.isPublished(),
+              holder.isAvailable(),
+              holder.isRealtime(),
+              holder.getNumReplicas()
+          ).withNumReplicas(holder.getNumReplicas() + 1).build();
+          knownSegments.put(segment, holderWithNumReplicas);
+        }
+        if (server.segmentReplicatable()) {
+          // If a segment shows up on a replicatable (historical) server at 
any point, then it must be immutable,
+          // even if it's also available on non-replicatable (realtime) 
servers.
+          mutableSegments.remove(segment);
+          log.debug("Segment[%s] has become immutable.", 
segment.getIdentifier());
+        }
       }
-
       if (!tables.containsKey(segment.getDataSource())) {
         refreshImmediately = true;
       }
@@ -356,11 +371,11 @@ public class DruidSchema extends AbstractSchema
       segmentsNeedingRefresh.remove(segment);
       mutableSegments.remove(segment);
 
-      final Map<DataSegment, RowSignature> dataSourceSegments = 
segmentSignatures.get(segment.getDataSource());
+      final Map<DataSegment, SegmentMetadataHolder> dataSourceSegments = 
segmentMetadataInfo.get(segment.getDataSource());
       dataSourceSegments.remove(segment);
 
       if (dataSourceSegments.isEmpty()) {
-        segmentSignatures.remove(segment.getDataSource());
+        segmentMetadataInfo.remove(segment.getDataSource());
         tables.remove(segment.getDataSource());
         log.info("Removed all metadata for dataSource[%s].", 
segment.getDataSource());
       }
@@ -431,10 +446,21 @@ public class DruidSchema extends AbstractSchema
         if (segment == null) {
           log.warn("Got analysis for segment[%s] we didn't ask for, 
ignoring.", analysis.getId());
         } else {
-          final RowSignature rowSignature = analysisToRowSignature(analysis);
-          log.debug("Segment[%s] has signature[%s].", segment.getIdentifier(), 
rowSignature);
-          setSegmentSignature(segment, rowSignature);
-          retVal.add(segment);
+          synchronized (lock) {
+            final RowSignature rowSignature = analysisToRowSignature(analysis);
+            log.debug("Segment[%s] has signature[%s].", 
segment.getIdentifier(), rowSignature);
+            final Map<DataSegment, SegmentMetadataHolder> dataSourceSegments = 
segmentMetadataInfo.get(segment.getDataSource());
+            SegmentMetadataHolder holder = dataSourceSegments.get(segment);
+            SegmentMetadataHolder updatedHolder = new 
SegmentMetadataHolder.Builder(
+                holder.isPublished(),
+                holder.isAvailable(),
+                holder.isRealtime(),
+                holder.getNumReplicas()
+            
).withRowSignature(rowSignature).withNumRows(analysis.getNumRows()).build();
+            dataSourceSegments.put(segment, updatedHolder);
+            setSegmentSignature(segment, updatedHolder);
+            retVal.add(segment);
+          }
         }
 
         yielder = yielder.next(null);
@@ -455,22 +481,23 @@ public class DruidSchema extends AbstractSchema
     return retVal;
   }
 
-  private void setSegmentSignature(final DataSegment segment, final 
RowSignature rowSignature)
+  private void setSegmentSignature(final DataSegment segment, final 
SegmentMetadataHolder segmentMetadataHolder)
   {
     synchronized (lock) {
-      segmentSignatures.computeIfAbsent(segment.getDataSource(), x -> new 
TreeMap<>(SEGMENT_ORDER))
-                       .put(segment, rowSignature);
+      segmentMetadataInfo.computeIfAbsent(segment.getDataSource(), x -> new 
TreeMap<>(SEGMENT_ORDER))
+                         .put(segment, segmentMetadataHolder);
     }
   }
 
   private DruidTable buildDruidTable(final String dataSource)
   {
     synchronized (lock) {
-      final TreeMap<DataSegment, RowSignature> segmentMap = 
segmentSignatures.get(dataSource);
+      final TreeMap<DataSegment, SegmentMetadataHolder> segmentMap = 
segmentMetadataInfo.get(dataSource);
       final Map<String, ValueType> columnTypes = new TreeMap<>();
 
       if (segmentMap != null) {
-        for (RowSignature rowSignature : segmentMap.values()) {
+        for (SegmentMetadataHolder segmentMetadataHolder : 
segmentMap.values()) {
+          final RowSignature rowSignature = 
segmentMetadataHolder.getRowSignature();
           if (rowSignature != null) {
             for (String column : rowSignature.getRowOrder()) {
               // Newer column types should override older ones.
@@ -520,7 +547,6 @@ public class DruidSchema extends AbstractSchema
   private static RowSignature analysisToRowSignature(final SegmentAnalysis 
analysis)
   {
     final RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
-
     for (Map.Entry<String, ColumnAnalysis> entry : 
analysis.getColumns().entrySet()) {
       if (entry.getValue().isError()) {
         // Skip columns with analysis errors.
@@ -539,7 +565,17 @@ public class DruidSchema extends AbstractSchema
 
       rowSignatureBuilder.add(entry.getKey(), valueType);
     }
-
     return rowSignatureBuilder.build();
   }
+
+  public Map<DataSegment, SegmentMetadataHolder> getSegmentMetadata()
+  {
+    final Map<DataSegment, SegmentMetadataHolder> segmentMetadata = new 
HashMap<>();
+    synchronized (lock) {
+      for (TreeMap<DataSegment, SegmentMetadataHolder> val : 
segmentMetadataInfo.values()) {
+        segmentMetadata.putAll(val);
+      }
+    }
+    return segmentMetadata;
+  }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
new file mode 100644
index 0000000..f3b13bb
--- /dev/null
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.druid.sql.calcite.schema;
+
+import org.apache.druid.sql.calcite.table.RowSignature;
+
+import javax.annotation.Nullable;
+
+/**
+ * Immutable representation of RowSignature and other segment attributes 
needed by {@link SystemSchema.SegmentsTable}
+ */
+public class SegmentMetadataHolder
+{
+
+  // Booleans represented as long type, where 1 = true and 0 = false
+  // to make it easy to count number of segments which are
+  // published, available or realtime etc.
+  private final long isPublished;
+  private final long isAvailable;
+  private final long isRealtime;
+
+  private final long numReplicas;
+  @Nullable
+  private final RowSignature rowSignature;
+  @Nullable
+  private final Long numRows;
+
+  private SegmentMetadataHolder(Builder builder)
+  {
+    this.rowSignature = builder.rowSignature;
+    this.isPublished = builder.isPublished;
+    this.isAvailable = builder.isAvailable;
+    this.isRealtime = builder.isRealtime;
+    this.numReplicas = builder.numReplicas;
+    this.numRows = builder.numRows;
+  }
+
+  public long isPublished()
+  {
+    return isPublished;
+  }
+
+  public long isAvailable()
+  {
+    return isAvailable;
+  }
+
+  public long isRealtime()
+  {
+    return isRealtime;
+  }
+
+  public long getNumReplicas()
+  {
+    return numReplicas;
+  }
+
+  @Nullable
+  public Long getNumRows()
+  {
+    return numRows;
+  }
+
+  @Nullable
+  public RowSignature getRowSignature()
+  {
+    return rowSignature;
+  }
+
+  public static class Builder
+  {
+    private final long isPublished;
+    private final long isAvailable;
+    private final long isRealtime;
+    private long numReplicas;
+    @Nullable
+    private RowSignature rowSignature;
+    @Nullable
+    private Long numRows;
+
+    public Builder(
+        long isPublished,
+        long isAvailable,
+        long isRealtime,
+        long numReplicas
+    )
+    {
+      this.isPublished = isPublished;
+      this.isAvailable = isAvailable;
+      this.isRealtime = isRealtime;
+      this.numReplicas = numReplicas;
+    }
+
+    public Builder withRowSignature(RowSignature rowSignature)
+    {
+      this.rowSignature = rowSignature;
+      return this;
+    }
+
+    public Builder withNumRows(Long numRows)
+    {
+      this.numRows = numRows;
+      return this;
+    }
+
+    public Builder withNumReplicas(long numReplicas)
+    {
+      this.numReplicas = numReplicas;
+      return this;
+    }
+
+    public SegmentMetadataHolder build()
+    {
+      return new SegmentMetadataHolder(this);
+    }
+  }
+
+}
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
new file mode 100644
index 0000000..da34bbe
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.inject.Inject;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.DefaultEnumerable;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.druid.client.ImmutableDruidServer;
+import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.client.TimelineServerView;
+import org.apache.druid.client.coordinator.Coordinator;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.table.RowSignature;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class SystemSchema extends AbstractSchema
+{
+  private static final Logger log = new Logger(SystemSchema.class);
+
+  public static final String NAME = "sys";
+  private static final String SEGMENTS_TABLE = "segments";
+  private static final String SERVERS_TABLE = "servers";
+  private static final String SERVER_SEGMENTS_TABLE = "server_segments";
+  private static final String TASKS_TABLE = "tasks";
+
+  private static final RowSignature SEGMENTS_SIGNATURE = RowSignature
+      .builder()
+      .add("segment_id", ValueType.STRING)
+      .add("datasource", ValueType.STRING)
+      .add("start", ValueType.STRING)
+      .add("end", ValueType.STRING)
+      .add("size", ValueType.LONG)
+      .add("version", ValueType.STRING)
+      .add("partition_num", ValueType.STRING)
+      .add("num_replicas", ValueType.LONG)
+      .add("num_rows", ValueType.LONG)
+      .add("is_published", ValueType.LONG)
+      .add("is_available", ValueType.LONG)
+      .add("is_realtime", ValueType.LONG)
+      .add("payload", ValueType.STRING)
+      .build();
+
+  private static final RowSignature SERVERS_SIGNATURE = RowSignature
+      .builder()
+      .add("server", ValueType.STRING)
+      .add("host", ValueType.STRING)
+      .add("plaintext_port", ValueType.STRING)
+      .add("tls_port", ValueType.STRING)
+      .add("server_type", ValueType.STRING)
+      .add("tier", ValueType.STRING)
+      .add("curr_size", ValueType.LONG)
+      .add("max_size", ValueType.LONG)
+      .build();
+
+  private static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature
+      .builder()
+      .add("server", ValueType.STRING)
+      .add("segment_id", ValueType.STRING)
+      .build();
+
+  private static final RowSignature TASKS_SIGNATURE = RowSignature
+      .builder()
+      .add("task_id", ValueType.STRING)
+      .add("type", ValueType.STRING)
+      .add("datasource", ValueType.STRING)
+      .add("created_time", ValueType.STRING)
+      .add("queue_insertion_time", ValueType.STRING)
+      .add("status", ValueType.STRING)
+      .add("runner_status", ValueType.STRING)
+      .add("duration", ValueType.LONG)
+      .add("location", ValueType.STRING)
+      .add("host", ValueType.STRING)
+      .add("plaintext_port", ValueType.STRING)
+      .add("tls_port", ValueType.STRING)
+      .add("error_msg", ValueType.STRING)
+      .build();
+
+  private final Map<String, Table> tableMap;
+
+  @Inject
+  public SystemSchema(
+      final DruidSchema druidSchema,
+      final TimelineServerView serverView,
+      final AuthorizerMapper authorizerMapper,
+      final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient,
+      final @IndexingService DruidLeaderClient overlordDruidLeaderClient,
+      final ObjectMapper jsonMapper
+  )
+  {
+    Preconditions.checkNotNull(serverView, "serverView");
+    BytesAccumulatingResponseHandler responseHandler = new 
BytesAccumulatingResponseHandler();
+    this.tableMap = ImmutableMap.of(
+        SEGMENTS_TABLE, new SegmentsTable(druidSchema, 
coordinatorDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper),
+        SERVERS_TABLE, new ServersTable(serverView, authorizerMapper),
+        SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, 
authorizerMapper),
+        TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, 
responseHandler, authorizerMapper)
+    );
+  }
+
+  @Override
+  public Map<String, Table> getTableMap()
+  {
+    return tableMap;
+  }
+
+  static class SegmentsTable extends AbstractTable implements ScannableTable
+  {
+    private final DruidSchema druidSchema;
+    private final DruidLeaderClient druidLeaderClient;
+    private final ObjectMapper jsonMapper;
+    private final BytesAccumulatingResponseHandler responseHandler;
+    private final AuthorizerMapper authorizerMapper;
+
+    public SegmentsTable(
+        DruidSchema druidSchemna,
+        DruidLeaderClient druidLeaderClient,
+        ObjectMapper jsonMapper,
+        BytesAccumulatingResponseHandler responseHandler,
+        AuthorizerMapper authorizerMapper
+    )
+    {
+      this.druidSchema = druidSchemna;
+      this.druidLeaderClient = druidLeaderClient;
+      this.jsonMapper = jsonMapper;
+      this.responseHandler = responseHandler;
+      this.authorizerMapper = authorizerMapper;
+    }
+
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory)
+    {
+      return SEGMENTS_SIGNATURE.getRelDataType(typeFactory);
+    }
+
+    @Override
+    public TableType getJdbcTableType()
+    {
+      return TableType.SYSTEM_TABLE;
+    }
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root)
+    {
+      //get available segments from druidSchema
+      final Map<DataSegment, SegmentMetadataHolder> availableSegmentMetadata = 
druidSchema.getSegmentMetadata();
+      final Iterator<Entry<DataSegment, SegmentMetadataHolder>> 
availableSegmentEntries = availableSegmentMetadata.entrySet()
+                                                                               
                                   .iterator();
+
+      //get published segments from coordinator
+      final JsonParserIterator<DataSegment> metadataSegments = 
getMetadataSegments(
+          druidLeaderClient,
+          jsonMapper,
+          responseHandler
+      );
+
+      Set<String> availableSegmentIds = new HashSet<>();
+      //auth check for available segments
+      final Iterator<Entry<DataSegment, SegmentMetadataHolder>> 
authorizedAvailableSegments = getAuthorizedAvailableSegments(
+          availableSegmentEntries,
+          root
+      );
+
+      final FluentIterable<Object[]> availableSegments = FluentIterable
+          .from(() -> authorizedAvailableSegments)
+          .transform(val -> {
+            try {
+              if (!availableSegmentIds.contains(val.getKey().getIdentifier())) 
{
+                availableSegmentIds.add(val.getKey().getIdentifier());
+              }
+              return new Object[]{
+                  val.getKey().getIdentifier(),
+                  val.getKey().getDataSource(),
+                  val.getKey().getInterval().getStart(),
+                  val.getKey().getInterval().getEnd(),
+                  val.getKey().getSize(),
+                  val.getKey().getVersion(),
+                  val.getKey().getShardSpec().getPartitionNum(),
+                  val.getValue().getNumReplicas(),
+                  val.getValue().getNumRows(),
+                  val.getValue().isPublished(),
+                  val.getValue().isAvailable(),
+                  val.getValue().isRealtime(),
+                  jsonMapper.writeValueAsString(val.getKey())
+              };
+            }
+            catch (JsonProcessingException e) {
+              throw new RuntimeException(StringUtils.format(
+                  "Error getting segment payload for segment %s",
+                  val.getKey().getIdentifier()
+              ), e);
+            }
+          });
+
+      //auth check for published segments
+      final CloseableIterator<DataSegment> authorizedPublishedSegments = 
getAuthorizedPublishedSegments(
+          metadataSegments,
+          root
+      );
+      final FluentIterable<Object[]> publishedSegments = FluentIterable
+          .from(() -> authorizedPublishedSegments)
+          .transform(val -> {
+            try {
+              if (availableSegmentIds.contains(val.getIdentifier())) {
+                return null;
+              }
+              return new Object[]{
+                  val.getIdentifier(),
+                  val.getDataSource(),
+                  val.getInterval().getStart(),
+                  val.getInterval().getEnd(),
+                  val.getSize(),
+                  val.getVersion(),
+                  val.getShardSpec().getPartitionNum(),
+                  0L,
+                  -1L,
+                  1L,
+                  0L,
+                  0L,
+                  jsonMapper.writeValueAsString(val)
+              };
+            }
+            catch (JsonProcessingException e) {
+              throw new RuntimeException(StringUtils.format(
+                  "Error getting segment payload for segment %s",
+                  val.getIdentifier()
+              ), e);
+            }
+          });
+
+      final Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
+          Iterables.concat(availableSegments, publishedSegments));
+
+      return Linq4j.asEnumerable(allSegments).where(t -> t != null);
+
+    }
+
+    private Iterator<Entry<DataSegment, SegmentMetadataHolder>> 
getAuthorizedAvailableSegments(
+        Iterator<Entry<DataSegment, SegmentMetadataHolder>> 
availableSegmentEntries,
+        DataContext root
+    )
+    {
+      final AuthenticationResult authenticationResult =
+          (AuthenticationResult) 
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
+
+      Function<Entry<DataSegment, SegmentMetadataHolder>, 
Iterable<ResourceAction>> raGenerator = segment -> Collections
+          
.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getKey().getDataSource()));
+
+      final Iterable<Entry<DataSegment, SegmentMetadataHolder>> 
authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
+          authenticationResult, () -> availableSegmentEntries, raGenerator, 
authorizerMapper);
+
+      return authorizedSegments.iterator();
+    }
+
+    private CloseableIterator<DataSegment> getAuthorizedPublishedSegments(
+        JsonParserIterator<DataSegment> it,
+        DataContext root
+    )
+    {
+      final AuthenticationResult authenticationResult =
+          (AuthenticationResult) 
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
+
+      Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> 
Collections.singletonList(
+          
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
+
+      final Iterable<DataSegment> authorizedSegments = 
AuthorizationUtils.filterAuthorizedResources(
+          authenticationResult, () -> it, raGenerator, authorizerMapper);
+
+      return wrap(authorizedSegments.iterator(), it);
+    }
+  }
+
+  // Note that coordinator must be up to get segments
+  private static JsonParserIterator<DataSegment> getMetadataSegments(
+      DruidLeaderClient coordinatorClient,
+      ObjectMapper jsonMapper,
+      BytesAccumulatingResponseHandler responseHandler
+  )
+  {
+
+    Request request;
+    try {
+      request = coordinatorClient.makeRequest(
+          HttpMethod.GET,
+          StringUtils.format("/druid/coordinator/v1/metadata/segments")
+      );
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    ListenableFuture<InputStream> future = coordinatorClient.goAsync(
+        request,
+        responseHandler
+    );
+    final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new 
TypeReference<DataSegment>()
+    {
+    });
+    return new JsonParserIterator<>(
+        typeRef,
+        future,
+        request.getUrl().toString(),
+        null,
+        request.getUrl().getHost(),
+        jsonMapper
+    );
+  }
+
+  static class ServersTable extends AbstractTable implements ScannableTable
+  {
+    private final TimelineServerView serverView;
+    private final AuthorizerMapper authorizerMapper;
+
+    public ServersTable(TimelineServerView serverView, AuthorizerMapper 
authorizerMapper)
+    {
+      this.serverView = serverView;
+      this.authorizerMapper = authorizerMapper;
+    }
+
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory)
+    {
+      return SERVERS_SIGNATURE.getRelDataType(typeFactory);
+    }
+
+    @Override
+    public TableType getJdbcTableType()
+    {
+      return TableType.SYSTEM_TABLE;
+    }
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root)
+    {
+      final List<ImmutableDruidServer> druidServers = 
serverView.getDruidServers();
+      final AuthenticationResult authenticationResult =
+          (AuthenticationResult) 
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
+      final Access access = AuthorizationUtils.authorizeAllResourceActions(
+          authenticationResult,
+          Collections.singletonList(new ResourceAction(new Resource("STATE", 
ResourceType.STATE), Action.READ)),
+          authorizerMapper
+      );
+      if (!access.isAllowed()) {
+        throw new ForbiddenException("Insufficient permission to view servers 
:" + access.toString());
+      }
+      final FluentIterable<Object[]> results = FluentIterable
+          .from(druidServers)
+          .transform(val -> new Object[]{
+              val.getHost(),
+              val.getHost().split(":")[0],
+              val.getHostAndPort() == null ? -1 : 
val.getHostAndPort().split(":")[1],
+              val.getHostAndTlsPort() == null ? -1 : 
val.getHostAndTlsPort().split(":")[1],
+              val.getType(),
+              val.getTier(),
+              val.getCurrSize(),
+              val.getMaxSize()
+          });
+      return Linq4j.asEnumerable(results);
+    }
+  }
+
+  private static class ServerSegmentsTable extends AbstractTable implements 
ScannableTable
+  {
+    private final TimelineServerView serverView;
+    final AuthorizerMapper authorizerMapper;
+
+    public ServerSegmentsTable(TimelineServerView serverView, AuthorizerMapper 
authorizerMapper)
+    {
+      this.serverView = serverView;
+      this.authorizerMapper = authorizerMapper;
+    }
+
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory)
+    {
+      return SERVER_SEGMENTS_SIGNATURE.getRelDataType(typeFactory);
+    }
+
+    @Override
+    public TableType getJdbcTableType()
+    {
+      return TableType.SYSTEM_TABLE;
+    }
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root)
+    {
+      final List<Object[]> rows = new ArrayList<>();
+      final List<ImmutableDruidServer> druidServers = 
serverView.getDruidServers();
+      final int serverSegmentsTableSize = 
SERVER_SEGMENTS_SIGNATURE.getRowOrder().size();
+      for (ImmutableDruidServer druidServer : druidServers) {
+        final Map<String, DataSegment> segmentMap = druidServer.getSegments();
+        for (DataSegment segment : segmentMap.values()) {
+          Object[] row = new Object[serverSegmentsTableSize];
+          row[0] = druidServer.getHost();
+          row[1] = segment.getIdentifier();
+          rows.add(row);
+        }
+      }
+      return Linq4j.asEnumerable(rows);
+    }
+  }
+
+  static class TasksTable extends AbstractTable implements ScannableTable
+  {
+    private final DruidLeaderClient druidLeaderClient;
+    private final ObjectMapper jsonMapper;
+    private final BytesAccumulatingResponseHandler responseHandler;
+    private final AuthorizerMapper authorizerMapper;
+
+    public TasksTable(
+        DruidLeaderClient druidLeaderClient,
+        ObjectMapper jsonMapper,
+        BytesAccumulatingResponseHandler responseHandler,
+        AuthorizerMapper authorizerMapper
+    )
+    {
+      this.druidLeaderClient = druidLeaderClient;
+      this.jsonMapper = jsonMapper;
+      this.responseHandler = responseHandler;
+      this.authorizerMapper = authorizerMapper;
+    }
+
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory)
+    {
+      return TASKS_SIGNATURE.getRelDataType(typeFactory);
+    }
+
+    @Override
+    public TableType getJdbcTableType()
+    {
+      return TableType.SYSTEM_TABLE;
+    }
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root)
+    {
+      class TasksEnumerable extends DefaultEnumerable<Object[]>
+      {
+        private final CloseableIterator<TaskStatusPlus> it;
+
+        public TasksEnumerable(JsonParserIterator<TaskStatusPlus> tasks)
+        {
+          this.it = getAuthorizedTasks(tasks, root);
+        }
+
+        @Override
+        public Iterator<Object[]> iterator()
+        {
+          throw new UnsupportedOperationException("Do not use iterator(), it 
cannot be closed.");
+        }
+
+        @Override
+        public Enumerator<Object[]> enumerator()
+        {
+          return new Enumerator<Object[]>()
+          {
+            @Override
+            public Object[] current()
+            {
+              TaskStatusPlus task = it.next();
+              return new Object[]{task.getId(),
+                                  task.getType(),
+                                  task.getDataSource(),
+                                  task.getCreatedTime(),
+                                  task.getQueueInsertionTime(),
+                                  task.getStatusCode(),
+                                  task.getRunnerStatusCode(),
+                                  task.getDuration(),
+                                  task.getLocation().getHost() + ":" + 
(task.getLocation().getTlsPort()
+                                                                          == -1
+                                                                          ? 
task.getLocation()
+                                                                               
 .getPort()
+                                                                          : 
task.getLocation().getTlsPort()),
+                                  task.getLocation().getHost(),
+                                  task.getLocation().getPort(),
+                                  task.getLocation().getTlsPort(),
+                                  task.getErrorMsg()};
+            }
+
+            @Override
+            public boolean moveNext()
+            {
+              return it.hasNext();
+            }
+
+            @Override
+            public void reset()
+            {
+
+            }
+
+            @Override
+            public void close()
+            {
+              try {
+                it.close();
+              }
+              catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          };
+        }
+      }
+
+      return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper, 
responseHandler));
+    }
+
+    private CloseableIterator<TaskStatusPlus> 
getAuthorizedTasks(JsonParserIterator<TaskStatusPlus> it, DataContext root)
+    {
+      final AuthenticationResult authenticationResult =
+          (AuthenticationResult) 
root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
+
+      Function<TaskStatusPlus, Iterable<ResourceAction>> raGenerator = task -> 
Collections.singletonList(
+          
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(task.getDataSource()));
+
+      final Iterable<TaskStatusPlus> authorizedTasks = 
AuthorizationUtils.filterAuthorizedResources(
+          authenticationResult, () -> it, raGenerator, authorizerMapper);
+
+      return wrap(authorizedTasks.iterator(), it);
+    }
+
+  }
+
+  //Note that overlord must be up to get tasks
+  private static JsonParserIterator<TaskStatusPlus> getTasks(
+      DruidLeaderClient indexingServiceClient,
+      ObjectMapper jsonMapper,
+      BytesAccumulatingResponseHandler responseHandler
+  )
+  {
+
+    Request request;
+    try {
+      request = indexingServiceClient.makeRequest(
+          HttpMethod.GET,
+          StringUtils.format("/druid/indexer/v1/tasks")
+      );
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    ListenableFuture<InputStream> future = indexingServiceClient.goAsync(
+        request,
+        responseHandler
+    );
+    final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new 
TypeReference<TaskStatusPlus>()
+    {
+    });
+    return new JsonParserIterator<>(
+        typeRef,
+        future,
+        request.getUrl().toString(),
+        null,
+        request.getUrl().getHost(),
+        jsonMapper
+    );
+  }
+
+  private static <T> CloseableIterator<T> wrap(Iterator<T> iterator, 
JsonParserIterator<T> it)
+  {
+    return new CloseableIterator<T>()
+    {
+      @Override
+      public boolean hasNext()
+      {
+        final boolean hasNext = iterator.hasNext();
+        if (!hasNext) {
+          try {
+            it.close();
+          }
+          catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        return hasNext;
+      }
+
+      @Override
+      public T next()
+      {
+        return iterator.next();
+      }
+
+      @Override
+      public void close() throws IOException
+      {
+        it.close();
+      }
+    };
+  }
+
+}
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 89c1e5f..4831421 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -56,6 +56,7 @@ import 
org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.SystemSchema;
 import org.apache.druid.sql.calcite.util.CalciteTestBase;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.QueryLogHook;
@@ -153,6 +154,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
     walker = CalciteTests.createMockWalker(conglomerate, 
temporaryFolder.newFolder());
     final PlannerConfig plannerConfig = new PlannerConfig();
     final DruidSchema druidSchema = 
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
+    final SystemSchema systemSchema = 
CalciteTests.createMockSystemSchema(druidSchema, walker);
     final DruidOperatorTable operatorTable = 
CalciteTests.createOperatorTable();
     final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
 
@@ -177,6 +179,7 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
     druidMeta = new DruidMeta(
         new PlannerFactory(
             druidSchema,
+            systemSchema,
             CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
             operatorTable,
             macroTable,
@@ -752,12 +755,14 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
 
     final PlannerConfig plannerConfig = new PlannerConfig();
     final DruidSchema druidSchema = 
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
+    final SystemSchema systemSchema = 
CalciteTests.createMockSystemSchema(druidSchema, walker);
     final DruidOperatorTable operatorTable = 
CalciteTests.createOperatorTable();
     final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
     final List<Meta.Frame> frames = new ArrayList<>();
     DruidMeta smallFrameDruidMeta = new DruidMeta(
         new PlannerFactory(
             druidSchema,
+            systemSchema,
             CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
             operatorTable,
             macroTable,
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
index 56a2700..77a44dc 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java
@@ -35,6 +35,7 @@ import 
org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.SystemSchema;
 import org.apache.druid.sql.calcite.util.CalciteTestBase;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.QueryLogHook;
@@ -86,10 +87,12 @@ public class DruidStatementTest extends CalciteTestBase
     walker = CalciteTests.createMockWalker(conglomerate, 
temporaryFolder.newFolder());
     final PlannerConfig plannerConfig = new PlannerConfig();
     final DruidSchema druidSchema = 
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
+    final SystemSchema systemSchema = 
CalciteTests.createMockSystemSchema(druidSchema, walker);
     final DruidOperatorTable operatorTable = 
CalciteTests.createOperatorTable();
     final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
     plannerFactory = new PlannerFactory(
         druidSchema,
+        systemSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         operatorTable,
         macroTable,
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 30e8670..81bb091 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -106,6 +106,7 @@ import org.apache.druid.sql.calcite.planner.PlannerFactory;
 import org.apache.druid.sql.calcite.planner.PlannerResult;
 import org.apache.druid.sql.calcite.rel.CannotBuildQueryException;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.SystemSchema;
 import org.apache.druid.sql.calcite.util.CalciteTestBase;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.QueryLogHook;
@@ -468,6 +469,7 @@ public class CalciteQueryTest extends CalciteTestBase
         ImmutableList.of(),
         ImmutableList.of(
             new Object[]{"druid"},
+            new Object[]{"sys"},
             new Object[]{"INFORMATION_SCHEMA"}
         )
     );
@@ -488,7 +490,11 @@ public class CalciteQueryTest extends CalciteTestBase
             new Object[]{"druid", "bview", "VIEW"},
             new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"},
             new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"},
-            new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}
+            new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"},
+            new Object[]{"sys", "segments", "SYSTEM_TABLE"},
+            new Object[]{"sys", "server_segments", "SYSTEM_TABLE"},
+            new Object[]{"sys", "servers", "SYSTEM_TABLE"},
+            new Object[]{"sys", "tasks", "SYSTEM_TABLE"}
         )
     );
 
@@ -507,7 +513,11 @@ public class CalciteQueryTest extends CalciteTestBase
             new Object[]{"druid", "bview", "VIEW"},
             new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"},
             new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"},
-            new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}
+            new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"},
+            new Object[]{"sys", "segments", "SYSTEM_TABLE"},
+            new Object[]{"sys", "server_segments", "SYSTEM_TABLE"},
+            new Object[]{"sys", "servers", "SYSTEM_TABLE"},
+            new Object[]{"sys", "tasks", "SYSTEM_TABLE"}
         )
     );
   }
@@ -7710,11 +7720,13 @@ public class CalciteQueryTest extends CalciteTestBase
   {
     final InProcessViewManager viewManager = new 
InProcessViewManager(CalciteTests.TEST_AUTHENTICATOR_ESCALATOR);
     final DruidSchema druidSchema = 
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig, viewManager);
+    final SystemSchema systemSchema = 
CalciteTests.createMockSystemSchema(druidSchema, walker);
     final DruidOperatorTable operatorTable = 
CalciteTests.createOperatorTable();
     final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
 
     final PlannerFactory plannerFactory = new PlannerFactory(
         druidSchema,
+        systemSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
         operatorTable,
         macroTable,
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
index 99b7024..59374e3 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.SystemSchema;
 import org.apache.druid.sql.calcite.util.CalciteTestBase;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.QueryLogHook;
@@ -113,6 +114,7 @@ public class SqlResourceTest extends CalciteTestBase
 
     final PlannerConfig plannerConfig = new PlannerConfig();
     final DruidSchema druidSchema = 
CalciteTests.createMockSchema(conglomerate, walker, plannerConfig);
+    final SystemSchema systemSchema = 
CalciteTests.createMockSystemSchema(druidSchema, walker);
     final DruidOperatorTable operatorTable = 
CalciteTests.createOperatorTable();
     final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
     req = EasyMock.createStrictMock(HttpServletRequest.class);
@@ -134,6 +136,7 @@ public class SqlResourceTest extends CalciteTestBase
         JSON_MAPPER,
         new PlannerFactory(
             druidSchema,
+            systemSchema,
             CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
             operatorTable,
             macroTable,
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
new file mode 100644
index 0000000..9b90733
--- /dev/null
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.druid.sql.calcite.schema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.SettableFuture;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.client.DirectDruidClient;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ImmutableDruidServer;
+import org.apache.druid.client.TimelineServerView;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import 
org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
+import org.apache.druid.java.util.http.client.response.FullResponseHolder;
+import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.ReflectionQueryToolChestWarehouse;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.Authorizer;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.NoopEscalator;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.util.CalciteTestBase;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
+import org.apache.druid.sql.calcite.util.TestServerInventoryView;
+import org.apache.druid.sql.calcite.view.NoopViewManager;
+import org.apache.druid.timeline.DataSegment;
+import org.easymock.EasyMock;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.servlet.http.HttpServletResponse;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+public class SystemSchemaTest extends CalciteTestBase
+{
+  private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new 
PlannerConfig();
+
+  private static final List<InputRow> ROWS1 = ImmutableList.of(
+      CalciteTests.createRow(ImmutableMap.of("t", "2000-01-01", "m1", "1.0", 
"dim1", "")),
+      CalciteTests.createRow(ImmutableMap.of("t", "2000-01-02", "m1", "2.0", 
"dim1", "10.1")),
+      CalciteTests.createRow(ImmutableMap.of("t", "2000-01-03", "m1", "3.0", 
"dim1", "2"))
+  );
+
+  private static final List<InputRow> ROWS2 = ImmutableList.of(
+      CalciteTests.createRow(ImmutableMap.of("t", "2001-01-01", "m1", "4.0", 
"dim2", ImmutableList.of("a"))),
+      CalciteTests.createRow(ImmutableMap.of("t", "2001-01-02", "m1", "5.0", 
"dim2", ImmutableList.of("abc"))),
+      CalciteTests.createRow(ImmutableMap.of("t", "2001-01-03", "m1", "6.0"))
+  );
+
+  private SystemSchema schema;
+  private SpecificSegmentsQuerySegmentWalker walker;
+  private DruidLeaderClient client;
+  private TimelineServerView serverView;
+  private ObjectMapper mapper;
+  private FullResponseHolder responseHolder;
+  private BytesAccumulatingResponseHandler responseHandler;
+  private Request request;
+  private DruidSchema druidSchema;
+  private AuthorizerMapper authMapper;
+  private static QueryRunnerFactoryConglomerate conglomerate;
+  private static Closer resourceCloser;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setUpClass()
+  {
+    final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair 
= CalciteTests
+        .createQueryRunnerFactoryConglomerate();
+    conglomerate = conglomerateCloserPair.lhs;
+    resourceCloser = conglomerateCloserPair.rhs;
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws IOException
+  {
+    resourceCloser.close();
+  }
+
+  @Before
+  public void setUp() throws Exception
+  {
+    serverView = EasyMock.createNiceMock(TimelineServerView.class);
+    client = EasyMock.createMock(DruidLeaderClient.class);
+    mapper = TestHelper.makeJsonMapper();
+    responseHolder = EasyMock.createMock(FullResponseHolder.class);
+    responseHandler = 
EasyMock.createMockBuilder(BytesAccumulatingResponseHandler.class)
+                              .withConstructor()
+                              .addMockedMethod(
+                                      "handleResponse",
+                                      HttpResponse.class,
+                                      HttpResponseHandler.TrafficCop.class
+                                  )
+                              .addMockedMethod("getStatus")
+                              .createMock();
+    request = EasyMock.createMock(Request.class);
+    authMapper = new AuthorizerMapper(null)
+    {
+      @Override
+      public Authorizer getAuthorizer(String name)
+      {
+        return (authenticationResult, resource, action) -> new Access(true);
+      }
+    };
+
+    final File tmpDir = temporaryFolder.newFolder();
+    final QueryableIndex index1 = IndexBuilder.create()
+                                              .tmpDir(new File(tmpDir, "1"))
+                                              
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                                              .schema(
+                                                  new 
IncrementalIndexSchema.Builder()
+                                                      .withMetrics(
+                                                          new 
CountAggregatorFactory("cnt"),
+                                                          new 
DoubleSumAggregatorFactory("m1", "m1"),
+                                                          new 
HyperUniquesAggregatorFactory("unique_dim1", "dim1")
+                                                      )
+                                                      .withRollup(false)
+                                                      .build()
+                                              )
+                                              .rows(ROWS1)
+                                              .buildMMappedIndex();
+
+    final QueryableIndex index2 = IndexBuilder.create()
+                                              .tmpDir(new File(tmpDir, "2"))
+                                              
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+                                              .schema(
+                                                  new 
IncrementalIndexSchema.Builder()
+                                                      .withMetrics(new 
LongSumAggregatorFactory("m1", "m1"))
+                                                      .withRollup(false)
+                                                      .build()
+                                              )
+                                              .rows(ROWS2)
+                                              .buildMMappedIndex();
+
+    walker = new SpecificSegmentsQuerySegmentWalker(conglomerate)
+        .add(segment1, index1)
+        .add(segment2, index2)
+        .add(segment2, index2)
+        .add(segment3, index2);
+
+    druidSchema = new DruidSchema(
+        CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
+        new TestServerInventoryView(walker.getSegments()),
+        PLANNER_CONFIG_DEFAULT,
+        new NoopViewManager(),
+        new NoopEscalator()
+    );
+    druidSchema.start();
+    druidSchema.awaitInitialization();
+    schema = new SystemSchema(
+        druidSchema,
+        serverView,
+        EasyMock.createStrictMock(AuthorizerMapper.class),
+        client,
+        client,
+        mapper
+    );
+  }
+
+  private final DataSegment segment1 = new DataSegment(
+      "test1",
+      Intervals.of("2010/2011"),
+      "version1",
+      null,
+      ImmutableList.of("dim1", "dim2"),
+      ImmutableList.of("met1", "met2"),
+      null,
+      1,
+      100L,
+      DataSegment.PruneLoadSpecHolder.DEFAULT
+  );
+  private final DataSegment segment2 = new DataSegment(
+      "test2",
+      Intervals.of("2011/2012"),
+      "version2",
+      null,
+      ImmutableList.of("dim1", "dim2"),
+      ImmutableList.of("met1", "met2"),
+      null,
+      1,
+      100L,
+      DataSegment.PruneLoadSpecHolder.DEFAULT
+  );
+  private final DataSegment segment3 = new DataSegment(
+      "test3",
+      Intervals.of("2012/2013"),
+      "version3",
+      null,
+      ImmutableList.of("dim1", "dim2"),
+      ImmutableList.of("met1", "met2"),
+      null,
+      1,
+      100L,
+      DataSegment.PruneLoadSpecHolder.DEFAULT
+  );
+  private final DataSegment segment4 = new DataSegment(
+      "test4",
+      Intervals.of("2017/2018"),
+      "version4",
+      null,
+      ImmutableList.of("dim1", "dim2"),
+      ImmutableList.of("met1", "met2"),
+      null,
+      1,
+      100L,
+      DataSegment.PruneLoadSpecHolder.DEFAULT
+  );
+  private final DataSegment segment5 = new DataSegment(
+      "test5",
+      Intervals.of("2017/2018"),
+      "version5",
+      null,
+      ImmutableList.of("dim1", "dim2"),
+      ImmutableList.of("met1", "met2"),
+      null,
+      1,
+      100L,
+      DataSegment.PruneLoadSpecHolder.DEFAULT
+  );
+
+  private final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
+  private final DirectDruidClient client1 = new DirectDruidClient(
+      new ReflectionQueryToolChestWarehouse(),
+      QueryRunnerTestHelper.NOOP_QUERYWATCHER,
+      new DefaultObjectMapper(),
+      httpClient,
+      "http",
+      "foo",
+      new NoopServiceEmitter()
+  );
+  private final DirectDruidClient client2 = new DirectDruidClient(
+      new ReflectionQueryToolChestWarehouse(),
+      QueryRunnerTestHelper.NOOP_QUERYWATCHER,
+      new DefaultObjectMapper(),
+      httpClient,
+      "http",
+      "foo2",
+      new NoopServiceEmitter()
+  );
+  private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
+      new DruidServerMetadata("server1", "localhost:0000", null, 5L, 
ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
+      1L,
+      null,
+      ImmutableMap.of("segment1", segment1, "segment2", segment2)
+  );
+
+  private final ImmutableDruidServer druidServer2 = new ImmutableDruidServer(
+      new DruidServerMetadata("server2", "server2:1234", null, 5L, 
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
+      1L,
+      null,
+      ImmutableMap.of("segment2", segment2, "segment4", segment4, "segment5", 
segment5)
+  );
+
+  private final List<ImmutableDruidServer> immutableDruidServers = 
ImmutableList.of(druidServer1, druidServer2);
+
+  @Test
+  public void testGetTableMap()
+  {
+    Assert.assertEquals(ImmutableSet.of("segments", "servers", 
"server_segments", "tasks"), schema.getTableNames());
+
+    final Map<String, Table> tableMap = schema.getTableMap();
+    Assert.assertEquals(ImmutableSet.of("segments", "servers", 
"server_segments", "tasks"), tableMap.keySet());
+    final SystemSchema.SegmentsTable segmentsTable = 
(SystemSchema.SegmentsTable) schema.getTableMap().get("segments");
+    final RelDataType rowType = segmentsTable.getRowType(new 
JavaTypeFactoryImpl());
+    final List<RelDataTypeField> fields = rowType.getFieldList();
+
+    Assert.assertEquals(13, fields.size());
+
+    final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) 
schema.getTableMap().get("tasks");
+    final RelDataType sysRowType = tasksTable.getRowType(new 
JavaTypeFactoryImpl());
+    final List<RelDataTypeField> sysFields = sysRowType.getFieldList();
+    Assert.assertEquals(13, sysFields.size());
+
+    Assert.assertEquals("task_id", sysFields.get(0).getName());
+    Assert.assertEquals(SqlTypeName.VARCHAR, 
sysFields.get(0).getType().getSqlTypeName());
+
+    final SystemSchema.ServersTable serversTable = (SystemSchema.ServersTable) 
schema.getTableMap().get("servers");
+    final RelDataType serverRowType = serversTable.getRowType(new 
JavaTypeFactoryImpl());
+    final List<RelDataTypeField> serverFields = serverRowType.getFieldList();
+    Assert.assertEquals(8, serverFields.size());
+    Assert.assertEquals("server", serverFields.get(0).getName());
+    Assert.assertEquals(SqlTypeName.VARCHAR, 
serverFields.get(0).getType().getSqlTypeName());
+  }
+
+  @Test
+  public void testSegmentsTable() throws Exception
+  {
+    // total segments = 6
+    // segments 1,2,3 are published and available
+    // segments 4,5,6  are published but unavailable
+    // segment 3 is published but not served
+    // segment 2 is served by 2 servers, so num_replicas=2
+
+    final SystemSchema.SegmentsTable segmentsTable = 
EasyMock.createMockBuilder(SystemSchema.SegmentsTable.class).withConstructor(
+        druidSchema, client, mapper, responseHandler, authMapper).createMock();
+    EasyMock.replay(segmentsTable);
+
+    EasyMock.expect(client.makeRequest(HttpMethod.GET, 
"/druid/coordinator/v1/metadata/segments")).andReturn(request).anyTimes();
+    SettableFuture<InputStream> future = SettableFuture.create();
+    EasyMock.expect(client.goAsync(request, 
responseHandler)).andReturn(future).once();
+    final int ok = HttpServletResponse.SC_OK;
+    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+
+    EasyMock.expect(request.getUrl()).andReturn(new 
URL("http://test-host:1234/druid/coordinator/v1/metadata/segments";)).anyTimes();
+
+    AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
+    //published but unavailable segments
+    final String json = "[{\n"
+                        + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+                        + "\t\"interval\": 
\"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
+                        + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
+                        + "\t\"loadSpec\": {\n"
+                        + "\t\t\"type\": \"local\",\n"
+                        + "\t\t\"path\": 
\"/var/druid/segments/wikipedia-kafka/2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z/2018-08-07T23:00:00.059Z/51/1578eb79-0e44-4b41-a87b-65e40c52be53/index.zip\"\n"
+                        + "\t},\n"
+                        + "\t\"dimensions\": 
\"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,regionIsoCode,regionName,added,deleted,delta\",\n"
+                        + "\t\"metrics\": \"count,user_unique\",\n"
+                        + "\t\"shardSpec\": {\n"
+                        + "\t\t\"type\": \"none\",\n"
+                        + "\t\t\"partitionNum\": 51,\n"
+                        + "\t\t\"partitions\": 0\n"
+                        + "\t},\n"
+                        + "\t\"binaryVersion\": 9,\n"
+                        + "\t\"size\": 47406,\n"
+                        + "\t\"identifier\": 
\"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_51\"\n"
+                        + "}, {\n"
+                        + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+                        + "\t\"interval\": 
\"2018-08-07T18:00:00.000Z/2018-08-07T19:00:00.000Z\",\n"
+                        + "\t\"version\": \"2018-08-07T18:00:00.117Z\",\n"
+                        + "\t\"loadSpec\": {\n"
+                        + "\t\t\"type\": \"local\",\n"
+                        + "\t\t\"path\": 
\"/var/druid/segments/wikipedia-kafka/2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z/2018-08-07T18:00:00.117Z/9/a2646827-b782-424c-9eed-e48aa448d2c5/index.zip\"\n"
+                        + "\t},\n"
+                        + "\t\"dimensions\": 
\"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,metroCode,regionIsoCode,regionName,added,deleted,delta\",\n"
+                        + "\t\"metrics\": \"count,user_unique\",\n"
+                        + "\t\"shardSpec\": {\n"
+                        + "\t\t\"type\": \"none\",\n"
+                        + "\t\t\"partitionNum\": 9,\n"
+                        + "\t\t\"partitions\": 0\n"
+                        + "\t},\n"
+                        + "\t\"binaryVersion\": 9,\n"
+                        + "\t\"size\": 83846,\n"
+                        + "\t\"identifier\": 
\"wikipedia-kafka_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z_9\"\n"
+                        + "}, {\n"
+                        + "\t\"dataSource\": \"wikipedia-kafka\",\n"
+                        + "\t\"interval\": 
\"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n"
+                        + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n"
+                        + "\t\"loadSpec\": {\n"
+                        + "\t\t\"type\": \"local\",\n"
+                        + "\t\t\"path\": 
\"/var/druid/segments/wikipedia-kafka/2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z/2018-08-07T23:00:00.059Z/50/87c5457e-c39b-4c03-9df8-e2b20b210dfc/index.zip\"\n"
+                        + "\t},\n"
+                        + "\t\"dimensions\": 
\"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,metroCode,regionIsoCode,regionName,added,deleted,delta\",\n"
+                        + "\t\"metrics\": \"count,user_unique\",\n"
+                        + "\t\"shardSpec\": {\n"
+                        + "\t\t\"type\": \"none\",\n"
+                        + "\t\t\"partitionNum\": 50,\n"
+                        + "\t\t\"partitions\": 0\n"
+                        + "\t},\n"
+                        + "\t\"binaryVersion\": 9,\n"
+                        + "\t\"size\": 53527,\n"
+                        + "\t\"identifier\": 
\"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_50\"\n"
+                        + "}]";
+    byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
+    in.add(bytesToWrite);
+    in.done();
+    future.set(in);
+
+    EasyMock.replay(client, request, responseHolder, responseHandler);
+    DataContext dataContext = new DataContext()
+    {
+      @Override
+      public SchemaPlus getRootSchema()
+      {
+        return null;
+      }
+
+      @Override
+      public JavaTypeFactory getTypeFactory()
+      {
+        return null;
+      }
+
+      @Override
+      public QueryProvider getQueryProvider()
+      {
+        return null;
+      }
+
+      @Override
+      public Object get(String name)
+      {
+        return CalciteTests.SUPER_USER_AUTH_RESULT;
+      }
+    };
+    Enumerable<Object[]> rows = segmentsTable.scan(dataContext);
+    Enumerator<Object[]> enumerator = rows.enumerator();
+
+    Assert.assertEquals(true, enumerator.moveNext());
+    Assert.assertEquals(true, enumerator.moveNext());
+    Object[] row2 = enumerator.current();
+    //segment 2 is published and has 2 replicas
+    Assert.assertEquals(1L, row2[9]);
+    Assert.assertEquals(2L, row2[7]);
+    Assert.assertEquals(true, enumerator.moveNext());
+    Assert.assertEquals(true, enumerator.moveNext());
+    Assert.assertEquals(true, enumerator.moveNext());
+    Assert.assertEquals(true, enumerator.moveNext());
+    Object[] row6 = enumerator.current();
+    //segment 6 is published and unavailable, num_replicas is 0
+    Assert.assertEquals(1L, row6[9]);
+    Assert.assertEquals(0L, row6[7]);
+    Assert.assertEquals(false, enumerator.moveNext());
+
+  }
+
+  @Test
+  public void testServersTable()
+  {
+
+    SystemSchema.ServersTable serversTable = 
EasyMock.createMockBuilder(SystemSchema.ServersTable.class).withConstructor(serverView,
 authMapper).createMock();
+    EasyMock.replay(serversTable);
+
+    EasyMock.expect(serverView.getDruidServers())
+            .andReturn(immutableDruidServers)
+            .once();
+    EasyMock.replay(serverView);
+    DataContext dataContext = new DataContext()
+    {
+      @Override
+      public SchemaPlus getRootSchema()
+      {
+        return null;
+      }
+
+      @Override
+      public JavaTypeFactory getTypeFactory()
+      {
+        return null;
+      }
+
+      @Override
+      public QueryProvider getQueryProvider()
+      {
+        return null;
+      }
+
+      @Override
+      public Object get(String name)
+      {
+        return CalciteTests.SUPER_USER_AUTH_RESULT;
+      }
+    };
+    Enumerable<Object[]> rows = serversTable.scan(dataContext);
+    Assert.assertEquals(2, rows.count());
+    Object[] row1 = rows.first();
+    Assert.assertEquals("localhost:0000", row1[0]);
+    Assert.assertEquals("realtime", row1[4].toString());
+    Object[] row2 = rows.last();
+    Assert.assertEquals("server2:1234", row2[0]);
+    Assert.assertEquals("historical", row2[4].toString());
+  }
+
+  @Test
+  public void testTasksTable() throws Exception
+  {
+
+    SystemSchema.TasksTable tasksTable = 
EasyMock.createMockBuilder(SystemSchema.TasksTable.class)
+                                                 .withConstructor(client, 
mapper, responseHandler, authMapper)
+                                                 .createMock();
+    EasyMock.replay(tasksTable);
+    EasyMock.expect(client.makeRequest(HttpMethod.GET, 
"/druid/indexer/v1/tasks")).andReturn(request).anyTimes();
+    SettableFuture<InputStream> future = SettableFuture.create();
+    EasyMock.expect(client.goAsync(request, 
responseHandler)).andReturn(future).once();
+    final int ok = HttpServletResponse.SC_OK;
+    EasyMock.expect(responseHandler.getStatus()).andReturn(ok).once();
+    EasyMock.expect(request.getUrl()).andReturn(new 
URL("http://test-host:1234/druid/indexer/v1/tasks";)).anyTimes();
+
+    AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();
+
+
+    String json = "[{\n"
+                  + "\t\"id\": \"index_wikipedia_2018-09-20T22:33:44.911Z\",\n"
+                  + "\t\"type\": \"index\",\n"
+                  + "\t\"createdTime\": \"2018-09-20T22:33:44.922Z\",\n"
+                  + "\t\"queueInsertionTime\": \"1970-01-01T00:00:00.000Z\",\n"
+                  + "\t\"statusCode\": \"FAILED\",\n"
+                  + "\t\"runnerStatusCode\": \"NONE\",\n"
+                  + "\t\"duration\": -1,\n"
+                  + "\t\"location\": {\n"
+                  + "\t\t\"host\": \"testHost\",\n"
+                  + "\t\t\"port\": 1234,\n"
+                  + "\t\t\"tlsPort\": -1\n"
+                  + "\t},\n"
+                  + "\t\"dataSource\": \"wikipedia\",\n"
+                  + "\t\"errorMsg\": null\n"
+                  + "}, {\n"
+                  + "\t\"id\": \"index_wikipedia_2018-09-21T18:38:47.773Z\",\n"
+                  + "\t\"type\": \"index\",\n"
+                  + "\t\"createdTime\": \"2018-09-21T18:38:47.873Z\",\n"
+                  + "\t\"queueInsertionTime\": \"2018-09-21T18:38:47.910Z\",\n"
+                  + "\t\"statusCode\": \"RUNNING\",\n"
+                  + "\t\"runnerStatusCode\": \"RUNNING\",\n"
+                  + "\t\"duration\": null,\n"
+                  + "\t\"location\": {\n"
+                  + "\t\t\"host\": \"192.168.1.6\",\n"
+                  + "\t\t\"port\": 8100,\n"
+                  + "\t\t\"tlsPort\": -1\n"
+                  + "\t},\n"
+                  + "\t\"dataSource\": \"wikipedia\",\n"
+                  + "\t\"errorMsg\": null\n"
+                  + "}]";
+    byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
+    in.add(bytesToWrite);
+    in.done();
+    future.set(in);
+
+    EasyMock.replay(client, request, responseHandler);
+    DataContext dataContext = new DataContext()
+    {
+      @Override
+      public SchemaPlus getRootSchema()
+      {
+        return null;
+      }
+
+      @Override
+      public JavaTypeFactory getTypeFactory()
+      {
+        return null;
+      }
+
+      @Override
+      public QueryProvider getQueryProvider()
+      {
+        return null;
+      }
+
+      @Override
+      public Object get(String name)
+      {
+        return CalciteTests.SUPER_USER_AUTH_RESULT;
+      }
+    };
+    Enumerable<Object[]> rows = tasksTable.scan(dataContext);
+    Enumerator<Object[]> enumerator = rows.enumerator();
+
+    Assert.assertEquals(true, enumerator.moveNext());
+    Object[] row1 = enumerator.current();
+    Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", 
row1[0].toString());
+    Assert.assertEquals("FAILED", row1[5].toString());
+    Assert.assertEquals("NONE", row1[6].toString());
+    Assert.assertEquals(-1L, row1[7]);
+    Assert.assertEquals("testHost:1234", row1[8]);
+
+    Assert.assertEquals(true, enumerator.moveNext());
+    Object[] row2 = enumerator.current();
+    Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", 
row2[0].toString());
+    Assert.assertEquals("RUNNING", row2[5].toString());
+    Assert.assertEquals("RUNNING", row2[6].toString());
+    Assert.assertEquals(null, row2[7]);
+    Assert.assertEquals("192.168.1.6:8100", row2[8]);
+
+    Assert.assertEquals(false, enumerator.moveNext());
+  }
+
+}
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 4716b0d..a83fde5 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -32,19 +32,24 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Module;
+import org.apache.curator.x.discovery.ServiceProvider;
 import org.apache.druid.collections.CloseableStupidPool;
+import org.apache.druid.curator.discovery.ServerDiscoverySelector;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.guice.ExpressionModule;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.emitter.core.NoopEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
 import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
@@ -115,10 +120,12 @@ import 
org.apache.druid.sql.calcite.expression.builtin.LookupOperatorConversion;
 import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.schema.DruidSchema;
+import org.apache.druid.sql.calcite.schema.SystemSchema;
 import org.apache.druid.sql.calcite.view.NoopViewManager;
 import org.apache.druid.sql.calcite.view.ViewManager;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.easymock.EasyMock;
 import org.joda.time.DateTime;
 import org.joda.time.chrono.ISOChronology;
 
@@ -568,4 +575,30 @@ public class CalciteTests
         )
     ).get(0);
   }
+
+
+  public static SystemSchema createMockSystemSchema(
+      final DruidSchema druidSchema,
+      final SpecificSegmentsQuerySegmentWalker walker
+  )
+  {
+    final DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
+        EasyMock.createMock(HttpClient.class),
+        EasyMock.createMock(DruidNodeDiscoveryProvider.class),
+        "nodetype",
+        "/simple/leader",
+        new 
ServerDiscoverySelector(EasyMock.createMock(ServiceProvider.class), "test")
+    )
+    {
+    };
+    final SystemSchema schema = new SystemSchema(
+        druidSchema,
+        new TestServerInventoryView(walker.getSegments()),
+        TEST_AUTHORIZER_MAPPER,
+        druidLeaderClient,
+        druidLeaderClient,
+        getJsonMapper()
+    );
+    return schema;
+  }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
index e46f3cb..e0b6825 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
@@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.util;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.client.TimelineServerView;
 import org.apache.druid.client.selector.ServerSelector;
 import org.apache.druid.query.DataSource;
@@ -30,9 +31,10 @@ import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineLookup;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.concurrent.Executor;
-
+// this class is used for testing and benchmark
 public class TestServerInventoryView implements TimelineServerView
 {
   private static final DruidServerMetadata DUMMY_SERVER = new 
DruidServerMetadata(
@@ -57,6 +59,13 @@ public class TestServerInventoryView implements 
TimelineServerView
     throw new UnsupportedOperationException();
   }
 
+  @Nullable
+  @Override
+  public List<ImmutableDruidServer> getDruidServers()
+  {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public void registerSegmentCallback(Executor exec, final SegmentCallback 
callback)
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to