This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch 34.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/34.0.0 by this push:
     new eb7847c0d01 Add projection names to DataSegment in a backward 
compatible manner. (#18223) (#18287)
eb7847c0d01 is described below

commit eb7847c0d015c3215cfdb03c30dbeddf4227889f
Author: Lucas Capistrant <[email protected]>
AuthorDate: Fri Jul 18 15:03:21 2025 -0500

    Add projection names to DataSegment in a backward compatible manner. 
(#18223) (#18287)
    
    * Add projection names to DataSegment in a backward compatible manner.
    
    * fix build
    
    * fix build2
    
    * fix unit test, consolidate projection serde to always convert missing key 
& null value to empty collection.
    
    * revert some previous changes that converts null projections to empty
    
    * fix test
    
    * add projections to sys.segments
    
    * fix it
    
    * style, and unit test
    
    * update DataSchemaTest
    
    * IT test
    
    * fix ldap test
    
    * fix build
    
    * add sleep for flaky test
    
    * add test
    
    * small data segment change
    
    * small data segment change
    
    Co-authored-by: Cece Mei <[email protected]>
---
 .../msq/input/table/DataSegmentWithLocation.java   |  31 +-
 .../DataSegmentAndIndexZipFilePathTest.java        |  26 +-
 .../indexing/common/actions/TaskLocksTest.java     |  82 ++---
 .../concurrent/ConcurrentReplaceAndAppendTest.java |  14 +-
 .../ConcurrentReplaceAndStreamingAppendTest.java   |  14 +-
 .../druid/testsEx/auth/ITSecurityBasicQuery.java   |   3 +
 .../docker/test-data/ldap-security-sample-data.sql |   2 +-
 .../docker/test-data/security-sample-data.sql      |   2 +-
 .../results/auth_test_sys_schema_segments.json     |   1 +
 .../druid/jackson/CommaListJoinSerializer.java     |   7 +
 .../org/apache/druid/timeline/DataSegment.java     | 185 +++++++----
 .../org/apache/druid/timeline/DataSegmentTest.java | 143 +++++---
 .../druid/timeline/SegmentStatusInClusterTest.java |  39 ++-
 .../IndexerSQLMetadataStorageCoordinator.java      |  33 +-
 .../apache/druid/segment/indexing/DataSchema.java  |  13 +-
 .../realtime/appenderator/StreamAppenderator.java  |  50 ++-
 .../apache/druid/segment/realtime/sink/Sink.java   |  19 +-
 .../server/coordination/LoadableDataSegment.java   |  12 +-
 .../CachingClusteredClientCacheKeyManagerTest.java |  11 +-
 .../druid/client/CachingClusteredClientTest.java   |  86 +++--
 .../druid/client/ImmutableDruidDataSourceTest.java |  50 ++-
 .../druid/segment/indexing/DataSchemaTest.java     |  50 ++-
 ...CoordinatorSegmentDataCacheConcurrencyTest.java |  22 +-
 .../metadata/SegmentMetadataCacheTestBase.java     |  43 +--
 .../appenderator/StreamAppenderatorDriverTest.java |  19 +-
 .../BatchDataSegmentAnnouncerTest.java             |  29 +-
 .../java/org/apache/druid/cli/ExportMetadata.java  |  12 +-
 .../druid/sql/calcite/schema/SystemSchema.java     |   4 +
 .../BrokerSegmentMetadataCacheConcurrencyTest.java |  26 +-
 .../schema/BrokerSegmentMetadataCacheTest.java     |  52 +--
 .../druid/sql/calcite/schema/SystemSchemaTest.java | 360 ++++++++-------------
 31 files changed, 694 insertions(+), 746 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
index 0e83e9c3ede..92d017a0dba 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
@@ -51,14 +51,11 @@ public class DataSegmentWithLocation extends DataSegment
       @JsonProperty("version") String version,
       // use `Map` *NOT* `LoadSpec` because we want to do lazy materialization 
to prevent dependency pollution
       @JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
-      @JsonProperty("dimensions")
-      @JsonDeserialize(using = CommaListJoinDeserializer.class)
-      @Nullable
+      @JsonProperty("dimensions") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable
       List<String> dimensions,
-      @JsonProperty("metrics")
-      @JsonDeserialize(using = CommaListJoinDeserializer.class)
-      @Nullable
-      List<String> metrics,
+      @JsonProperty("metrics") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
+      @JsonProperty("projections") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable
+      List<String> projections,
       @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
       @JsonProperty("lastCompactionState") @Nullable CompactionState 
lastCompactionState,
       @JsonProperty("binaryVersion") Integer binaryVersion,
@@ -67,7 +64,20 @@ public class DataSegmentWithLocation extends DataSegment
       @JacksonInject PruneSpecsHolder pruneSpecsHolder
   )
   {
-    super(dataSource, interval, version, loadSpec, dimensions, metrics, 
shardSpec, lastCompactionState, binaryVersion, size, pruneSpecsHolder);
+    super(
+        dataSource,
+        interval,
+        version,
+        loadSpec,
+        dimensions,
+        metrics,
+        projections,
+        shardSpec,
+        lastCompactionState,
+        binaryVersion,
+        size,
+        pruneSpecsHolder
+    );
     this.servers = Preconditions.checkNotNull(servers, "servers");
   }
 
@@ -83,9 +93,12 @@ public class DataSegmentWithLocation extends DataSegment
         dataSegment.getLoadSpec(),
         dataSegment.getDimensions(),
         dataSegment.getMetrics(),
+        dataSegment.getProjections(),
         dataSegment.getShardSpec(),
+        null,
         dataSegment.getBinaryVersion(),
-        dataSegment.getSize()
+        dataSegment.getSize(),
+        PruneSpecsHolder.DEFAULT
     );
     this.servers = servers;
   }
diff --git 
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
 
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
index 4eae8064c2d..aac1056def2 100644
--- 
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
+++ 
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
@@ -35,26 +35,12 @@ public class DataSegmentAndIndexZipFilePathTest
 {
   private static final SegmentId SEGMENT_ID = SegmentId.dummy("data-source", 
1);
   private static final SegmentId OTHER_SEGMENT_ID = 
SegmentId.dummy("data-source2", 1);
-  private static final DataSegment SEGMENT = new DataSegment(
-      SEGMENT_ID,
-      null,
-      null,
-      null,
-      new NumberedShardSpec(1, 10),
-      null,
-      0,
-      0
-  );
-  private static final DataSegment OTHER_SEGMENT = new DataSegment(
-      OTHER_SEGMENT_ID,
-      null,
-      null,
-      null,
-      new NumberedShardSpec(1, 10),
-      null,
-      0,
-      0
-  );
+  private static final DataSegment SEGMENT = DataSegment.builder(SEGMENT_ID)
+                                                        .shardSpec(new 
NumberedShardSpec(1, 10))
+                                                        .build();
+  private static final DataSegment OTHER_SEGMENT = 
DataSegment.builder(OTHER_SEGMENT_ID)
+                                                              .shardSpec(new 
NumberedShardSpec(1, 10))
+                                                              .build();
 
   private DataSegmentAndIndexZipFilePath target;
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
index 3fd3cd9cbd9..0d4ef09a789 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.metadata.ReplaceTaskLock;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.joda.time.Interval;
@@ -77,28 +78,17 @@ public class TaskLocksTest
 
   private Set<DataSegment> createTimeChunkedSegments()
   {
+    final String version = DateTimes.nowUtc().toString();
     return ImmutableSet.of(
-        new DataSegment.Builder()
-            .dataSource(task.getDataSource())
-            .interval(Intervals.of("2017-01-01/2017-01-02"))
-            .version(DateTimes.nowUtc().toString())
-            .shardSpec(new LinearShardSpec(2))
-            .size(0)
-            .build(),
-        new DataSegment.Builder()
-            .dataSource(task.getDataSource())
-            .interval(Intervals.of("2017-01-02/2017-01-03"))
-            .version(DateTimes.nowUtc().toString())
-            .shardSpec(new LinearShardSpec(2))
-            .size(0)
-            .build(),
-        new DataSegment.Builder()
-            .dataSource(task.getDataSource())
-            .interval(Intervals.of("2017-01-03/2017-01-04"))
-            .version(DateTimes.nowUtc().toString())
-            .shardSpec(new LinearShardSpec(2))
-            .size(0)
-            .build()
+        DataSegment.builder(SegmentId.of(task.getDataSource(), 
Intervals.of("2017-01-01/2017-01-02"), version, null))
+                   .shardSpec(new LinearShardSpec(2))
+                   .build(),
+        DataSegment.builder(SegmentId.of(task.getDataSource(), 
Intervals.of("2017-01-02/2017-01-03"), version, null))
+                   .shardSpec(new LinearShardSpec(2))
+                   .build(),
+        DataSegment.builder(SegmentId.of(task.getDataSource(), 
Intervals.of("2017-01-03/2017-01-04"), version, null))
+                   .shardSpec(new LinearShardSpec(2))
+                   .build()
     );
   }
 
@@ -106,41 +96,21 @@ public class TaskLocksTest
   {
     final String version = DateTimes.nowUtc().toString();
     return ImmutableSet.of(
-        new DataSegment.Builder()
-            .dataSource(task.getDataSource())
-            .interval(Intervals.of("2017-01-01/2017-01-02"))
-            .version(version)
-            .shardSpec(new NumberedShardSpec(0, 0))
-            .size(0)
-            .build(),
-        new DataSegment.Builder()
-            .dataSource(task.getDataSource())
-            .interval(Intervals.of("2017-01-01/2017-01-02"))
-            .version(version)
-            .shardSpec(new NumberedShardSpec(1, 0))
-            .size(0)
-            .build(),
-        new DataSegment.Builder()
-            .dataSource(task.getDataSource())
-            .interval(Intervals.of("2017-01-01/2017-01-02"))
-            .version(version)
-            .shardSpec(new NumberedShardSpec(2, 0))
-            .size(0)
-            .build(),
-        new DataSegment.Builder()
-            .dataSource(task.getDataSource())
-            .interval(Intervals.of("2017-01-01/2017-01-02"))
-            .version(version)
-            .shardSpec(new NumberedShardSpec(3, 0))
-            .size(0)
-            .build(),
-        new DataSegment.Builder()
-            .dataSource(task.getDataSource())
-            .interval(Intervals.of("2017-01-01/2017-01-02"))
-            .version(version)
-            .shardSpec(new NumberedShardSpec(4, 0))
-            .size(0)
-            .build()
+        DataSegment.builder(SegmentId.of(task.getDataSource(), 
Intervals.of("2017-01-01/2017-01-02"), version, null))
+                   .shardSpec(new NumberedShardSpec(0, 0))
+                   .build(),
+        DataSegment.builder(SegmentId.of(task.getDataSource(), 
Intervals.of("2017-01-01/2017-01-02"), version, null))
+                   .shardSpec(new NumberedShardSpec(1, 0))
+                   .build(),
+        DataSegment.builder(SegmentId.of(task.getDataSource(), 
Intervals.of("2017-01-01/2017-01-02"), version, null))
+                   .shardSpec(new NumberedShardSpec(2, 0))
+                   .build(),
+        DataSegment.builder(SegmentId.of(task.getDataSource(), 
Intervals.of("2017-01-01/2017-01-02"), version, null))
+                   .shardSpec(new NumberedShardSpec(3, 0))
+                   .build(),
+        DataSegment.builder(SegmentId.of(task.getDataSource(), 
Intervals.of("2017-01-01/2017-01-02"), version, null))
+                   .shardSpec(new NumberedShardSpec(4, 0))
+                   .build()
     );
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index be2d351f1c5..a6c0d772647 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -1221,16 +1221,10 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
   private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
   {
     final SegmentId id = pendingSegment.asSegmentId();
-    return new DataSegment(
-        id,
-        Collections.singletonMap(id.toString(), id.toString()),
-        Collections.emptyList(),
-        Collections.emptyList(),
-        pendingSegment.getShardSpec(),
-        null,
-        0,
-        0
-    );
+    return DataSegment.builder(id)
+                      .loadSpec(Collections.singletonMap(id.toString(), 
id.toString()))
+                      .shardSpec(pendingSegment.getShardSpec())
+                      .build();
   }
 
   private void verifyIntervalHasUsedSegments(Interval interval, DataSegment... 
expectedSegments)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
index 8f374ff16b5..868d8c15568 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
@@ -680,16 +680,10 @@ public class ConcurrentReplaceAndStreamingAppendTest 
extends IngestionTestBase
   private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
   {
     final SegmentId id = pendingSegment.asSegmentId();
-    return new DataSegment(
-        id,
-        Collections.singletonMap(id.toString(), id.toString()),
-        Collections.emptyList(),
-        Collections.emptyList(),
-        pendingSegment.getShardSpec(),
-        null,
-        0,
-        0
-    );
+    return DataSegment.builder(id)
+                      .loadSpec(Collections.singletonMap(id.toString(), 
id.toString()))
+                      .shardSpec(pendingSegment.getShardSpec())
+                      .build();
   }
 
   private void verifyIntervalHasUsedSegments(Interval interval, DataSegment... 
expectedSegments)
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/auth/ITSecurityBasicQuery.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/auth/ITSecurityBasicQuery.java
index 399a5e3b798..3612faa9eb0 100644
--- 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/auth/ITSecurityBasicQuery.java
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/auth/ITSecurityBasicQuery.java
@@ -105,6 +105,9 @@ public class ITSecurityBasicQuery
     List<ResourceAction> permissions = ImmutableList.of();
     securityClient.setPermissionsToRole(ROLE_1, permissions);
 
+    // Allow permissions sync across cluster to avoid flakes
+    Thread.sleep(SYNC_SLEEP);
+
     String queryLocal =
         StringUtils.format(
             "INSERT INTO %s\n"
diff --git a/integration-tests/docker/test-data/ldap-security-sample-data.sql 
b/integration-tests/docker/test-data/ldap-security-sample-data.sql
index 732cc55d4a5..5e4774663c0 100644
--- a/integration-tests/docker/test-data/ldap-security-sample-data.sql
+++ b/integration-tests/docker/test-data/ldap-security-sample-data.sql
@@ -14,4 +14,4 @@
 -- limitations under the License.
 
 INSERT INTO druid_tasks (id, created_date, datasource, payload, 
status_payload, active) VALUES ('index_auth_test_2030-04-30T01:13:31.893Z', 
'2030-04-30T01:13:31.893Z', 'auth_test', 
'{\"id\":\"index_auth_test_2030-04-30T01:13:31.893Z\",\"created_date\":\"2030-04-30T01:13:31.893Z\",\"datasource\":\"auth_test\",\"active\":0}',
 
'{\"id\":\"index_auth_test_2030-04-30T01:13:31.893Z\",\"status\":\"SUCCESS\",\"duration\":1}',
 0);
-INSERT INTO druid_segments 
(id,dataSource,created_date,start,end,partitioned,version,used,payload,used_status_last_updated)
 VALUES 
('auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','auth_test','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\
 [...]
+INSERT INTO druid_segments 
(id,dataSource,created_date,start,end,partitioned,version,used,payload,used_status_last_updated)
 VALUES 
('auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','auth_test','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\
 [...]
diff --git a/integration-tests/docker/test-data/security-sample-data.sql 
b/integration-tests/docker/test-data/security-sample-data.sql
index 732cc55d4a5..5e4774663c0 100644
--- a/integration-tests/docker/test-data/security-sample-data.sql
+++ b/integration-tests/docker/test-data/security-sample-data.sql
@@ -14,4 +14,4 @@
 -- limitations under the License.
 
 INSERT INTO druid_tasks (id, created_date, datasource, payload, 
status_payload, active) VALUES ('index_auth_test_2030-04-30T01:13:31.893Z', 
'2030-04-30T01:13:31.893Z', 'auth_test', 
'{\"id\":\"index_auth_test_2030-04-30T01:13:31.893Z\",\"created_date\":\"2030-04-30T01:13:31.893Z\",\"datasource\":\"auth_test\",\"active\":0}',
 
'{\"id\":\"index_auth_test_2030-04-30T01:13:31.893Z\",\"status\":\"SUCCESS\",\"duration\":1}',
 0);
-INSERT INTO druid_segments 
(id,dataSource,created_date,start,end,partitioned,version,used,payload,used_status_last_updated)
 VALUES 
('auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','auth_test','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\
 [...]
+INSERT INTO druid_segments 
(id,dataSource,created_date,start,end,partitioned,version,used,payload,used_status_last_updated)
 VALUES 
('auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','auth_test','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\
 [...]
diff --git 
a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
 
b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
index 1ce7b44bc61..1e3ae5ece90 100644
--- 
a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
+++ 
b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
@@ -17,6 +17,7 @@
     "shard_spec": "{\"type\":\"none\"}",
     "dimensions": 
"[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]",
     "metrics": 
"[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]",
+    "projections": "[\"daily_added\",\"daily_unique_users\"]",
     "last_compaction_state": null,
     "replication_factor": 2
   }
diff --git 
a/processing/src/main/java/org/apache/druid/jackson/CommaListJoinSerializer.java
 
b/processing/src/main/java/org/apache/druid/jackson/CommaListJoinSerializer.java
index 2ef3835419d..00ed1c5e31f 100644
--- 
a/processing/src/main/java/org/apache/druid/jackson/CommaListJoinSerializer.java
+++ 
b/processing/src/main/java/org/apache/druid/jackson/CommaListJoinSerializer.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.util.List;
 
 /**
+ * A custom Jackson serializer that converts a list of objects into a 
comma-separated string.
  */
 public class CommaListJoinSerializer extends StdScalarSerializer<List<String>>
 {
@@ -43,4 +44,10 @@ public class CommaListJoinSerializer extends 
StdScalarSerializer<List<String>>
   {
     jgen.writeString(JOINER.join(value));
   }
+
+  @Override
+  public boolean isEmpty(SerializerProvider prov, List<String> value)
+  {
+    return value.isEmpty();
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/DataSegment.java 
b/processing/src/main/java/org/apache/druid/timeline/DataSegment.java
index c0560a83985..4dc9b8edd94 100644
--- a/processing/src/main/java/org/apache/druid/timeline/DataSegment.java
+++ b/processing/src/main/java/org/apache/druid/timeline/DataSegment.java
@@ -45,11 +45,11 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+
 
 /**
  * Metadata of Druid's data segment. An immutable object.
- *
+ * <p>
  * DataSegment's equality ({@link #equals}/{@link #hashCode}) and {@link 
#compareTo} methods consider only the
  * {@link SegmentId} of the segment.
  */
@@ -86,6 +86,7 @@ public class DataSegment implements Comparable<DataSegment>, 
Overshadowable<Data
   private static final Interner<String> STRING_INTERNER = 
Interners.newWeakInterner();
   private static final Interner<List<String>> DIMENSIONS_INTERNER = 
Interners.newWeakInterner();
   private static final Interner<List<String>> METRICS_INTERNER = 
Interners.newWeakInterner();
+  private static final Interner<List<String>> PROJECTIONS_INTERNER = 
Interners.newWeakInterner();
   private static final Interner<CompactionState> COMPACTION_STATE_INTERNER = 
Interners.newWeakInterner();
   private static final Map<String, Object> PRUNED_LOAD_SPEC = ImmutableMap.of(
       "load spec is pruned, because it's not needed on Brokers, but eats a lot 
of heap space",
@@ -98,6 +99,7 @@ public class DataSegment implements Comparable<DataSegment>, 
Overshadowable<Data
   private final Map<String, Object> loadSpec;
   private final List<String> dimensions;
   private final List<String> metrics;
+  private final List<String> projections;
   private final ShardSpec shardSpec;
 
   /**
@@ -111,7 +113,10 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
   private final CompactionState lastCompactionState;
   private final long size;
 
-  @VisibleForTesting
+  /**
+   * @deprecated use {@link #builder(SegmentId)} or {@link 
#builder(DataSegment)} instead.
+   */
+  @Deprecated
   public DataSegment(
       SegmentId segmentId,
       Map<String, Object> loadSpec,
@@ -130,13 +135,19 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
         loadSpec,
         dimensions,
         metrics,
+        null,
         shardSpec,
         lastCompactionState,
         binaryVersion,
-        size
+        size,
+        PruneSpecsHolder.DEFAULT
     );
   }
 
+  /**
+   * @deprecated use {@link #builder(SegmentId)} or {@link 
#builder(DataSegment)} instead.
+   */
+  @Deprecated
   public DataSegment(
       String dataSource,
       Interval interval,
@@ -156,13 +167,19 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
         loadSpec,
         dimensions,
         metrics,
+        null,
         shardSpec,
         null,
         binaryVersion,
-        size
+        size,
+        PruneSpecsHolder.DEFAULT
     );
   }
 
+  /**
+   * @deprecated use {@link #builder(SegmentId)} or {@link 
#builder(DataSegment)} instead.
+   */
+  @Deprecated
   public DataSegment(
       String dataSource,
       Interval interval,
@@ -183,6 +200,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
         loadSpec,
         dimensions,
         metrics,
+        null,
         shardSpec,
         lastCompactionState,
         binaryVersion,
@@ -198,14 +216,11 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
       @JsonProperty("version") String version,
       // use `Map` *NOT* `LoadSpec` because we want to do lazy materialization 
to prevent dependency pollution
       @JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
-      @JsonProperty("dimensions")
-      @JsonDeserialize(using = CommaListJoinDeserializer.class)
-      @Nullable
-          List<String> dimensions,
-      @JsonProperty("metrics")
-      @JsonDeserialize(using = CommaListJoinDeserializer.class)
-      @Nullable
-          List<String> metrics,
+      @JsonProperty("dimensions") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable
+      List<String> dimensions,
+      @JsonProperty("metrics") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
+      @JsonProperty("projections") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable
+      List<String> projections,
       @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
       @JsonProperty("lastCompactionState") @Nullable CompactionState 
lastCompactionState,
       @JsonProperty("binaryVersion") Integer binaryVersion,
@@ -216,10 +231,11 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     this.id = SegmentId.of(dataSource, interval, version, shardSpec);
     // prune loadspec if needed
     this.loadSpec = pruneSpecsHolder.pruneLoadSpec ? PRUNED_LOAD_SPEC : 
prepareLoadSpec(loadSpec);
-    // Deduplicating dimensions and metrics lists as a whole because they are 
very likely the same for the same
-    // dataSource
-    this.dimensions = prepareDimensionsOrMetrics(dimensions, 
DIMENSIONS_INTERNER);
-    this.metrics = prepareDimensionsOrMetrics(metrics, METRICS_INTERNER);
+    this.dimensions = dimensions == null ? ImmutableList.of() : 
prepareWithInterner(dimensions, DIMENSIONS_INTERNER);
+    this.metrics = metrics == null ? ImmutableList.of() : 
prepareWithInterner(metrics, METRICS_INTERNER);
+    // A null value for projections means that this segment is not aware of 
projections (launched in druid 32).
+    // An empty list means that this segment is projection-aware, but has no 
projections.
+    this.projections = projections == null ? null : 
prepareWithInterner(projections, PROJECTIONS_INTERNER);
     this.shardSpec = (shardSpec == null) ? new NumberedShardSpec(0, 1) : 
shardSpec;
     this.lastCompactionState = pruneSpecsHolder.pruneLastCompactionState
                                ? null
@@ -229,46 +245,6 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     this.size = size;
   }
 
-  @Nullable
-  private Map<String, Object> prepareLoadSpec(@Nullable Map<String, Object> 
loadSpec)
-  {
-    if (loadSpec == null) {
-      return null;
-    }
-    // Load spec is just of 3 entries on average; HashMap/LinkedHashMap 
consumes much more memory than ArrayMap
-    Map<String, Object> result = new Object2ObjectArrayMap<>(loadSpec.size());
-    for (Map.Entry<String, Object> e : loadSpec.entrySet()) {
-      result.put(STRING_INTERNER.intern(e.getKey()), e.getValue());
-    }
-    return result;
-  }
-
-  @Nullable
-  private CompactionState prepareCompactionState(@Nullable CompactionState 
lastCompactionState)
-  {
-    if (lastCompactionState == null) {
-      return null;
-    }
-    return COMPACTION_STATE_INTERNER.intern(lastCompactionState);
-  }
-
-  private List<String> prepareDimensionsOrMetrics(@Nullable List<String> list, 
Interner<List<String>> interner)
-  {
-    if (list == null) {
-      return ImmutableList.of();
-    } else {
-      List<String> result = list
-          .stream()
-          .filter(s -> !Strings.isNullOrEmpty(s))
-          // dimensions & metrics are stored as canonical string values to 
decrease memory required for storing
-          // large numbers of segments.
-          .map(STRING_INTERNER::intern)
-          // TODO replace with ImmutableList.toImmutableList() when updated to 
Guava 21+
-          .collect(Collectors.collectingAndThen(Collectors.toList(), 
ImmutableList::copyOf));
-      return interner.intern(result);
-    }
-  }
-
   /**
    * Get dataSource
    *
@@ -314,6 +290,15 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     return metrics;
   }
 
+  @Nullable
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @JsonSerialize(using = CommaListJoinSerializer.class)
+  public List<String> getProjections()
+  {
+    return projections;
+  }
+
   @JsonProperty
   public ShardSpec getShardSpec()
   {
@@ -416,6 +401,11 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     return builder(this).metrics(metrics).build();
   }
 
+  public DataSegment withProjections(List<String> projections)
+  {
+    return builder(this).projections(projections).build();
+  }
+
   public DataSegment withShardSpec(ShardSpec newSpec)
   {
     return builder(this).shardSpec(newSpec).build();
@@ -471,17 +461,65 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
            ", loadSpec=" + loadSpec +
            ", dimensions=" + dimensions +
            ", metrics=" + metrics +
+           ", projections=" + projections +
            ", shardSpec=" + shardSpec +
            ", lastCompactionState=" + lastCompactionState +
            ", size=" + size +
            '}';
   }
 
+
+  @Nullable
+  private static Map<String, Object> prepareLoadSpec(@Nullable Map<String, 
Object> loadSpec)
+  {
+    if (loadSpec == null) {
+      return null;
+    }
+    // Load spec is just of 3 entries on average; HashMap/LinkedHashMap 
consumes much more memory than ArrayMap
+    Map<String, Object> result = new Object2ObjectArrayMap<>(loadSpec.size());
+    for (Map.Entry<String, Object> e : loadSpec.entrySet()) {
+      result.put(STRING_INTERNER.intern(e.getKey()), e.getValue());
+    }
+    return result;
+  }
+
+  @Nullable
+  private static CompactionState prepareCompactionState(@Nullable 
CompactionState lastCompactionState)
+  {
+    if (lastCompactionState == null) {
+      return null;
+    }
+    return COMPACTION_STATE_INTERNER.intern(lastCompactionState);
+  }
+
+  /**
+   * Returns a list of strings with all empty strings removed and all strings 
interned.
+   * <p>
+   * The dimensions, metrics, and projections are stored as canonical string 
values to decrease memory required for
+   * storing large numbers of segments.
+   */
+  private static List<String> prepareWithInterner(List<String> list, 
Interner<List<String>> interner)
+  {
+    return interner.intern(list.stream()
+                               .filter(s -> !Strings.isNullOrEmpty(s))
+                               .map(STRING_INTERNER::intern)
+                               .collect(ImmutableList.toImmutableList()));
+  }
+
+  /**
+   * @deprecated use {@link #builder(SegmentId)} or {@link 
#builder(DataSegment)} instead.
+   */
+  @Deprecated
   public static Builder builder()
   {
     return new Builder();
   }
 
+  public static Builder builder(SegmentId segmentId)
+  {
+    return new Builder(segmentId);
+  }
+
   public static Builder builder(DataSegment segment)
   {
     return new Builder(segment);
@@ -495,21 +533,39 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
     private Map<String, Object> loadSpec;
     private List<String> dimensions;
     private List<String> metrics;
+    private List<String> projections;
     private ShardSpec shardSpec;
     private CompactionState lastCompactionState;
     private Integer binaryVersion;
     private long size;
 
-    public Builder()
+    /**
+     * @deprecated use {@link #Builder(SegmentId)} or {@link 
#Builder(DataSegment)} instead.
+     */
+    @Deprecated
+    private Builder()
     {
       this.loadSpec = ImmutableMap.of();
       this.dimensions = ImmutableList.of();
       this.metrics = ImmutableList.of();
+      // By default, segment is not projection-aware.
+      this.projections = null;
       this.shardSpec = new NumberedShardSpec(0, 1);
       this.size = -1;
     }
 
-    public Builder(DataSegment segment)
+    private Builder(SegmentId segmentId)
+    {
+      this.dataSource = segmentId.getDataSource();
+      this.interval = segmentId.getInterval();
+      this.version = segmentId.getVersion();
+      this.shardSpec = new NumberedShardSpec(0, 1);
+      this.binaryVersion = 0;
+      this.size = 0;
+      this.lastCompactionState = null;
+    }
+
+    private Builder(DataSegment segment)
     {
       this.dataSource = segment.getDataSource();
       this.interval = segment.getInterval();
@@ -517,6 +573,7 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
       this.loadSpec = segment.getLoadSpec();
       this.dimensions = segment.getDimensions();
       this.metrics = segment.getMetrics();
+      this.projections = segment.getProjections();
       this.shardSpec = segment.getShardSpec();
       this.lastCompactionState = segment.getLastCompactionState();
       this.binaryVersion = segment.getBinaryVersion();
@@ -559,6 +616,12 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
       return this;
     }
 
+    public Builder projections(List<String> projections)
+    {
+      this.projections = projections;
+      return this;
+    }
+
     public Builder shardSpec(ShardSpec shardSpec)
     {
       this.shardSpec = shardSpec;
@@ -598,10 +661,12 @@ public class DataSegment implements 
Comparable<DataSegment>, Overshadowable<Data
           loadSpec,
           dimensions,
           metrics,
+          projections,
           shardSpec,
           lastCompactionState,
           binaryVersion,
-          size
+          size,
+          PruneSpecsHolder.DEFAULT
       );
     }
   }
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java 
b/processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
index 1f0a4f68ff0..54caacdf48b 100644
--- a/processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
+++ b/processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
@@ -57,6 +57,7 @@ import java.util.Set;
 import java.util.function.Function;
 
 /**
+ * Unit tests for {@link DataSegment}, covering construction, serialization, 
and utility methods.
  */
 public class DataSegmentTest
 {
@@ -114,6 +115,62 @@ public class DataSegmentTest
     MAPPER.setInjectableValues(injectableValues);
   }
 
+  @Test
+  public void testSerializationWithProjections() throws Exception
+  {
+    // arrange
+    final Interval interval = Intervals.of("2011-10-01/2011-10-02");
+    final ShardSpec shardSpec = new NumberedShardSpec(3, 0);
+    final SegmentId segmentId = SegmentId.of("something", interval, "1", 
shardSpec);
+
+    final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something", 
"or_other");
+    final CompactionState compactionState = new CompactionState(
+        new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
+        new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "bar", 
"foo"))),
+        ImmutableList.of(new CountAggregatorFactory("count")),
+        new CompactionTransformSpec(new SelectorDimFilter("dim1", "foo", 
null)),
+        MAPPER.convertValue(ImmutableMap.of(), IndexSpec.class),
+        MAPPER.convertValue(ImmutableMap.of(), GranularitySpec.class),
+        null
+    );
+    final DataSegment segment = DataSegment.builder(segmentId)
+                                           .loadSpec(loadSpec)
+                                           .dimensions(Arrays.asList("dim1", 
"dim2"))
+                                           .metrics(Arrays.asList("met1", 
"met2"))
+                                           .projections(Arrays.asList("proj1", 
"proj2"))
+                                           .shardSpec(shardSpec)
+                                           
.lastCompactionState(compactionState)
+                                           .binaryVersion(TEST_VERSION)
+                                           .size(1)
+                                           .build();
+    // act & assert
+    final Map<String, Object> objectMap = MAPPER.readValue(
+        MAPPER.writeValueAsString(segment),
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+    Assert.assertEquals(12, objectMap.size());
+    Assert.assertEquals("something", objectMap.get("dataSource"));
+    Assert.assertEquals(interval.toString(), objectMap.get("interval"));
+    Assert.assertEquals("1", objectMap.get("version"));
+    Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
+    Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
+    Assert.assertEquals("met1,met2", objectMap.get("metrics"));
+    Assert.assertEquals("proj1,proj2", objectMap.get("projections"));
+    Assert.assertEquals(
+        ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 
0),
+        objectMap.get("shardSpec")
+    );
+    Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
+    Assert.assertEquals(1, objectMap.get("size"));
+    Assert.assertEquals(6, ((Map) 
objectMap.get("lastCompactionState")).size());
+    // another act & assert
+    DataSegment deserializedSegment = 
MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
+    assertAllFieldsEquals(segment, deserializedSegment);
+    Assert.assertEquals(0, segment.compareTo(deserializedSegment));
+    Assert.assertEquals(0, deserializedSegment.compareTo(segment));
+    Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode());
+  }
+
   @Test
   public void testV1Serialization() throws Exception
   {
@@ -156,31 +213,18 @@ public class DataSegmentTest
     Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
     Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
     Assert.assertEquals("met1,met2", objectMap.get("metrics"));
-    Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, 
"partitions", 0), objectMap.get("shardSpec"));
+    Assert.assertEquals(
+        ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 
0),
+        objectMap.get("shardSpec")
+    );
     Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
     Assert.assertEquals(1, objectMap.get("size"));
     Assert.assertEquals(6, ((Map) 
objectMap.get("lastCompactionState")).size());
 
     DataSegment deserializedSegment = 
MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
-
-    Assert.assertEquals(segment.getDataSource(), 
deserializedSegment.getDataSource());
-    Assert.assertEquals(segment.getInterval(), 
deserializedSegment.getInterval());
-    Assert.assertEquals(segment.getVersion(), 
deserializedSegment.getVersion());
-    Assert.assertEquals(segment.getLoadSpec(), 
deserializedSegment.getLoadSpec());
-    Assert.assertEquals(segment.getDimensions(), 
deserializedSegment.getDimensions());
-    Assert.assertEquals(segment.getMetrics(), 
deserializedSegment.getMetrics());
-    Assert.assertEquals(segment.getShardSpec(), 
deserializedSegment.getShardSpec());
-    Assert.assertEquals(segment.getSize(), deserializedSegment.getSize());
-    Assert.assertEquals(segment.getId(), deserializedSegment.getId());
-    Assert.assertEquals(segment.getLastCompactionState(), 
deserializedSegment.getLastCompactionState());
-
-    deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), 
DataSegment.class);
+    assertAllFieldsEquals(segment, deserializedSegment);
     Assert.assertEquals(0, segment.compareTo(deserializedSegment));
-
-    deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), 
DataSegment.class);
     Assert.assertEquals(0, deserializedSegment.compareTo(segment));
-
-    deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), 
DataSegment.class);
     Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode());
   }
 
@@ -251,22 +295,16 @@ public class DataSegmentTest
     Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
     Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
     Assert.assertEquals("met1,met2", objectMap.get("metrics"));
-    Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, 
"partitions", 0), objectMap.get("shardSpec"));
+    Assert.assertEquals(
+        ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 
0),
+        objectMap.get("shardSpec")
+    );
     Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
     Assert.assertEquals(1, objectMap.get("size"));
     Assert.assertEquals(3, ((Map) 
objectMap.get("lastCompactionState")).size());
 
     DataSegment deserializedSegment = 
MAPPER.readValue(lastCompactionStateWithNullSpecs, DataSegment.class);
-    Assert.assertEquals(segment.getDataSource(), 
deserializedSegment.getDataSource());
-    Assert.assertEquals(segment.getInterval(), 
deserializedSegment.getInterval());
-    Assert.assertEquals(segment.getVersion(), 
deserializedSegment.getVersion());
-    Assert.assertEquals(segment.getLoadSpec(), 
deserializedSegment.getLoadSpec());
-    Assert.assertEquals(segment.getDimensions(), 
deserializedSegment.getDimensions());
-    Assert.assertEquals(segment.getMetrics(), 
deserializedSegment.getMetrics());
-    Assert.assertEquals(segment.getShardSpec(), 
deserializedSegment.getShardSpec());
-    Assert.assertEquals(segment.getSize(), deserializedSegment.getSize());
-    Assert.assertEquals(segment.getId(), deserializedSegment.getId());
-    Assert.assertEquals(segment.getLastCompactionState(), 
deserializedSegment.getLastCompactionState());
+    assertAllFieldsEquals(segment, deserializedSegment);
     Assert.assertNotNull(segment.getLastCompactionState());
     Assert.assertNull(segment.getLastCompactionState().getDimensionsSpec());
     Assert.assertNull(segment.getLastCompactionState().getTransformSpec());
@@ -274,13 +312,8 @@ public class DataSegmentTest
     Assert.assertNotNull(deserializedSegment.getLastCompactionState());
     
Assert.assertNull(deserializedSegment.getLastCompactionState().getDimensionsSpec());
 
-    deserializedSegment = MAPPER.readValue(lastCompactionStateWithNullSpecs, 
DataSegment.class);
     Assert.assertEquals(0, segment.compareTo(deserializedSegment));
-
-    deserializedSegment = MAPPER.readValue(lastCompactionStateWithNullSpecs, 
DataSegment.class);
     Assert.assertEquals(0, deserializedSegment.compareTo(segment));
-
-    deserializedSegment = MAPPER.readValue(lastCompactionStateWithNullSpecs, 
DataSegment.class);
     Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode());
   }
 
@@ -338,8 +371,12 @@ public class DataSegmentTest
   @Test
   public void testV1SerializationNullMetrics() throws Exception
   {
-    final DataSegment segment =
-        makeDataSegment("foo", "2012-01-01/2012-01-02", 
DateTimes.of("2012-01-01T11:22:33.444Z").toString());
+    final DataSegment segment = DataSegment.builder(SegmentId.of(
+        "foo",
+        Intervals.of("2012-01-01/2012-01-02"),
+        DateTimes.of("2012-01-01T11:22:33.444Z").toString(),
+        null
+    )).size(1).build();
 
     final DataSegment segment2 = 
MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
     Assert.assertEquals("empty dimensions", ImmutableList.of(), 
segment2.getDimensions());
@@ -367,12 +404,12 @@ public class DataSegmentTest
                                             
.lastCompactionState(compactionState)
                                             .build();
     final DataSegment segment2 = DataSegment.builder()
-                                           .dataSource("foo")
-                                           
.interval(Intervals.of("2012-01-01/2012-01-02"))
-                                           
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
-                                           .shardSpec(getShardSpec(7))
-                                           .size(0)
-                                           .build();
+                                            .dataSource("foo")
+                                            
.interval(Intervals.of("2012-01-01/2012-01-02"))
+                                            
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
+                                            .shardSpec(getShardSpec(7))
+                                            .size(0)
+                                            .build();
     Assert.assertEquals(segment1, 
segment2.withLastCompactionState(compactionState));
   }
 
@@ -409,7 +446,7 @@ public class DataSegmentTest
             transformSpec,
             indexSpec,
             granularitySpec,
-            null
+            ImmutableList.of()
         );
 
     final DataSegment segment1 = DataSegment.builder()
@@ -476,13 +513,19 @@ public class DataSegmentTest
 
   }
 
-  private DataSegment makeDataSegment(String dataSource, String interval, 
String version)
+  private static void assertAllFieldsEquals(DataSegment segment1, DataSegment 
segment2)
   {
-    return DataSegment.builder()
-                      .dataSource(dataSource)
-                      .interval(Intervals.of(interval))
-                      .version(version)
-                      .size(1)
-                      .build();
+    Assert.assertEquals(segment1.getDataSource(), segment2.getDataSource());
+    Assert.assertEquals(segment1.getInterval(), segment2.getInterval());
+    Assert.assertEquals(segment1.getVersion(), segment2.getVersion());
+    Assert.assertEquals(segment1.getLoadSpec(), segment2.getLoadSpec());
+    Assert.assertEquals(segment1.getDimensions(), segment2.getDimensions());
+    Assert.assertEquals(segment1.getMetrics(), segment2.getMetrics());
+    Assert.assertEquals(segment1.getProjections(), segment2.getProjections());
+    Assert.assertEquals(segment1.getShardSpec(), segment2.getShardSpec());
+    Assert.assertEquals(segment1.getSize(), segment2.getSize());
+    Assert.assertEquals(segment1.getBinaryVersion(), 
segment2.getBinaryVersion());
+    Assert.assertEquals(segment1.getId(), segment2.getId());
+    Assert.assertEquals(segment1.getLastCompactionState(), 
segment2.getLastCompactionState());
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
 
b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
index cc08f72ab02..5293f939902 100644
--- 
a/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
@@ -65,19 +65,15 @@ public class SegmentStatusInClusterTest
 
   private static SegmentStatusInCluster createSegmentForTest()
   {
-    DataSegment dataSegment = new DataSegment(
-        "something",
-        INTERVAL,
-        "1",
-        LOAD_SPEC,
-        Arrays.asList("dim1", "dim2"),
-        Arrays.asList("met1", "met2"),
-        NoneShardSpec.instance(),
-        null,
-        TEST_VERSION,
-        1
-    );
-
+    DataSegment dataSegment = DataSegment.builder(SegmentId.of("something", 
INTERVAL, "1", null))
+                                         .shardSpec(NoneShardSpec.instance())
+                                         .dimensions(Arrays.asList("dim1", 
"dim2"))
+                                         .metrics(Arrays.asList("met1", 
"met2"))
+                                         .projections(Arrays.asList("proj1", 
"proj2"))
+                                         .loadSpec(LOAD_SPEC)
+                                         .binaryVersion(TEST_VERSION)
+                                         .size(1)
+                                         .build();
     return new SegmentStatusInCluster(dataSegment, OVERSHADOWED, 
REPLICATION_FACTOR, NUM_ROWS, REALTIME);
   }
 
@@ -89,13 +85,14 @@ public class SegmentStatusInClusterTest
         JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
     );
 
-    Assert.assertEquals(14, objectMap.size());
+    Assert.assertEquals(15, objectMap.size());
     Assert.assertEquals("something", objectMap.get("dataSource"));
     Assert.assertEquals(INTERVAL.toString(), objectMap.get("interval"));
     Assert.assertEquals("1", objectMap.get("version"));
     Assert.assertEquals(LOAD_SPEC, objectMap.get("loadSpec"));
     Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
     Assert.assertEquals("met1,met2", objectMap.get("metrics"));
+    Assert.assertEquals("proj1,proj2", objectMap.get("projections"));
     Assert.assertEquals(ImmutableMap.of("type", "none"), 
objectMap.get("shardSpec"));
     Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
     Assert.assertEquals(1, objectMap.get("size"));
@@ -118,6 +115,7 @@ public class SegmentStatusInClusterTest
     Assert.assertEquals(dataSegment.getLoadSpec(), 
deserializedSegment.getLoadSpec());
     Assert.assertEquals(dataSegment.getDimensions(), 
deserializedSegment.getDimensions());
     Assert.assertEquals(dataSegment.getMetrics(), 
deserializedSegment.getMetrics());
+    Assert.assertEquals(dataSegment.getProjections(), 
deserializedSegment.getProjections());
     Assert.assertEquals(dataSegment.getShardSpec(), 
deserializedSegment.getShardSpec());
     Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize());
     Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId());
@@ -155,11 +153,10 @@ class TestSegment extends DataSegment
       @JsonProperty("dimensions")
       @JsonDeserialize(using = CommaListJoinDeserializer.class)
       @Nullable
-          List<String> dimensions,
-      @JsonProperty("metrics")
-      @JsonDeserialize(using = CommaListJoinDeserializer.class)
-      @Nullable
-          List<String> metrics,
+      List<String> dimensions,
+      @JsonProperty("metrics") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
+      @JsonProperty("projections") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable
+      List<String> projections,
       @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
       @JsonProperty("lasCompactionState") @Nullable CompactionState 
lastCompactionState,
       @JsonProperty("binaryVersion") Integer binaryVersion,
@@ -175,10 +172,12 @@ class TestSegment extends DataSegment
         loadSpec,
         dimensions,
         metrics,
+        projections,
         shardSpec,
         lastCompactionState,
         binaryVersion,
-        size
+        size,
+        PruneSpecsHolder.DEFAULT
     );
     this.overshadowed = overshadowed;
     this.replicationFactor = replicationFactor;
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index a2c5956772f..4ff74ea1f44 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -1203,18 +1203,11 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
             final SegmentId newVersionSegmentId = 
pendingSegment.getId().asSegmentId();
             newVersionSegmentToParent.put(newVersionSegmentId, 
oldSegment.getId());
             upgradedFromSegmentIdMap.put(newVersionSegmentId.toString(), 
oldSegment.getId().toString());
-            allSegmentsToInsert.add(
-                new DataSegment(
-                    pendingSegment.getId().asSegmentId(),
-                    oldSegment.getLoadSpec(),
-                    oldSegment.getDimensions(),
-                    oldSegment.getMetrics(),
-                    pendingSegment.getId().getShardSpec(),
-                    oldSegment.getLastCompactionState(),
-                    oldSegment.getBinaryVersion(),
-                    oldSegment.getSize()
-                )
-            );
+            allSegmentsToInsert.add(DataSegment.builder(oldSegment)
+                                               
.interval(newVersionSegmentId.getInterval())
+                                               
.version(newVersionSegmentId.getVersion())
+                                               
.shardSpec(pendingSegment.getId().getShardSpec())
+                                               .build());
           }
         }
     );
@@ -2548,18 +2541,10 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     Set<DataSegment> segmentsWithAllocationInfo = new HashSet<>();
     for (SegmentId id : overlappingSegmentIds) {
       final int corePartitions = 
versionIntervalToNumCorePartitions.get(id.getVersion()).get(id.getInterval());
-      segmentsWithAllocationInfo.add(
-          new DataSegment(
-              id,
-              null,
-              null,
-              null,
-              new NumberedShardSpec(id.getPartitionNum(), corePartitions),
-              null,
-              null,
-              1
-          )
-      );
+      segmentsWithAllocationInfo.add(DataSegment.builder(id)
+                                                .shardSpec(new 
NumberedShardSpec(id.getPartitionNum(), corePartitions))
+                                                .size(1)
+                                                .build());
     }
     return segmentsWithAllocationInfo;
   }
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java 
b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
index 6c35cb8d391..de951618df2 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
@@ -88,6 +88,7 @@ public class DataSchema
 
   // This is used for backward compatibility
   private InputRowParser inputRowParser;
+  @Nullable
   private List<AggregateProjectionSpec> projections;
 
   @JsonCreator
@@ -186,7 +187,7 @@ public class DataSchema
 
   /**
    * Computes the set of field names that are specified by the provided 
dimensions and aggregator lists.
-   *
+   * <p>
    * If either list is null, it is ignored.
    *
    * @throws IllegalArgumentException if there are duplicate field names, or 
if any dimension or aggregator
@@ -357,11 +358,21 @@ public class DataSchema
 
   @JsonProperty
   @JsonInclude(JsonInclude.Include.NON_NULL)
+  @Nullable
   public List<AggregateProjectionSpec> getProjections()
   {
     return projections;
   }
 
+  @Nullable
+  public List<String> getProjectionNames()
+  {
+    if (projections == null) {
+      return null;
+    }
+    return 
projections.stream().map(AggregateProjectionSpec::getName).collect(Collectors.toList());
+  }
+
   @Deprecated
   @JsonProperty("parser")
   @Nullable
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 396474bd29d..0d41f396f0e 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -205,10 +205,10 @@ public class StreamAppenderator implements Appenderator
 
   /**
    * This constructor allows the caller to provide its own 
SinkQuerySegmentWalker.
-   *
+   * <p>
    * The sinkTimeline is set to the sink timeline of the provided 
SinkQuerySegmentWalker.
    * If the SinkQuerySegmentWalker is null, a new sink timeline is initialized.
-   *
+   * <p>
    * It is used by UnifiedIndexerAppenderatorsManager which allows queries on 
data associated with multiple
    * Appenderators.
    */
@@ -868,7 +868,6 @@ public class StreamAppenderator implements Appenderator
    * @param identifier    sink identifier
    * @param sink          sink to push
    * @param useUniquePath true if the segment should be written to a path with 
a unique identifier
-   *
    * @return segment descriptor, or null if the sink is no longer valid
    */
   @Nullable
@@ -1115,7 +1114,7 @@ public class StreamAppenderator implements Appenderator
    * Unannounces the given base segment and all its upgraded versions.
    *
    * @param baseSegment base segment
-   * @param sink sink corresponding to the base segment
+   * @param sink        sink corresponding to the base segment
    * @return the set of all segment ids associated with the base segment 
containing the upgraded ids and itself.
    */
   private Set<SegmentIdWithShardSpec> 
unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink)
@@ -1128,17 +1127,15 @@ public class StreamAppenderator implements Appenderator
 
       final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment = 
baseSegmentToUpgradedSegments.remove(baseId);
       for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) {
-        final DataSegment newSegment = new DataSegment(
-            newId.getDataSource(),
-            newId.getInterval(),
-            newId.getVersion(),
-            baseSegment.getLoadSpec(),
-            baseSegment.getDimensions(),
-            baseSegment.getMetrics(),
-            newId.getShardSpec(),
-            baseSegment.getBinaryVersion(),
-            baseSegment.getSize()
-        );
+        final DataSegment newSegment = DataSegment.builder(newId.asSegmentId())
+                                                  
.shardSpec(newId.getShardSpec())
+                                                  
.loadSpec(baseSegment.getLoadSpec())
+                                                  
.dimensions(baseSegment.getDimensions())
+                                                  
.metrics(baseSegment.getMetrics())
+                                                  
.projections(baseSegment.getProjections())
+                                                  
.binaryVersion(baseSegment.getBinaryVersion())
+                                                  .size(baseSegment.getSize())
+                                                  .build();
         unannounceSegment(newSegment);
         upgradedSegmentToBaseSegment.remove(newId);
       }
@@ -1182,17 +1179,15 @@ public class StreamAppenderator implements Appenderator
 
   private DataSegment getUpgradedSegment(DataSegment baseSegment, 
SegmentIdWithShardSpec upgradedVersion)
   {
-    return new DataSegment(
-        upgradedVersion.getDataSource(),
-        upgradedVersion.getInterval(),
-        upgradedVersion.getVersion(),
-        baseSegment.getLoadSpec(),
-        baseSegment.getDimensions(),
-        baseSegment.getMetrics(),
-        upgradedVersion.getShardSpec(),
-        baseSegment.getBinaryVersion(),
-        baseSegment.getSize()
-    );
+    return DataSegment.builder(upgradedVersion.asSegmentId())
+                      .shardSpec(upgradedVersion.getShardSpec())
+                      .loadSpec(baseSegment.getLoadSpec())
+                      .dimensions(baseSegment.getDimensions())
+                      .metrics(baseSegment.getMetrics())
+                      .projections(baseSegment.getProjections())
+                      .binaryVersion(baseSegment.getBinaryVersion())
+                      .size(baseSegment.getSize())
+                      .build();
   }
 
   private void lockBasePersistDirectory()
@@ -1433,7 +1428,7 @@ public class StreamAppenderator implements Appenderator
    * Update the state of the appenderator when adding a sink.
    *
    * @param identifier sink identifier
-   * @param sink sink to be added
+   * @param sink       sink to be added
    */
   private void addSink(SegmentIdWithShardSpec identifier, Sink sink)
   {
@@ -1622,7 +1617,6 @@ public class StreamAppenderator implements Appenderator
    *
    * @param indexToPersist hydrant to persist
    * @param identifier     the segment this hydrant is going to be part of
-   *
    * @return the number of rows persisted
    */
   private int persistHydrant(FireHydrant indexToPersist, 
SegmentIdWithShardSpec identifier)
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java 
b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
index 45803107f18..87b2ab9cc52 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.sink;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -47,6 +46,7 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.realtime.FireHydrant;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.Overshadowable;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.apache.druid.utils.CloseableUtils;
 import org.joda.time.Interval;
@@ -249,17 +249,12 @@ public class Sink implements Iterable<FireHydrant>, 
Overshadowable<Sink>
 
   public DataSegment getSegment()
   {
-    return new DataSegment(
-        schema.getDataSource(),
-        interval,
-        version,
-        ImmutableMap.of(),
-        Collections.emptyList(),
-        Lists.transform(Arrays.asList(schema.getAggregators()), 
AggregatorFactory::getName),
-        shardSpec,
-        null,
-        0
-    );
+    return DataSegment.builder(SegmentId.of(schema.getDataSource(), interval, 
version, shardSpec))
+                      .shardSpec(shardSpec)
+                      .dimensions(null)
+                      
.metrics(Lists.transform(Arrays.asList(schema.getAggregators()), 
AggregatorFactory::getName))
+                      .projections(schema.getProjectionNames())
+                      .build();
   }
 
   public int getNumRows()
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
 
b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
index 2a633b6ad27..486bbf20bbb 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
@@ -47,14 +47,9 @@ public class LoadableDataSegment extends DataSegment
       @JsonProperty("version") String version,
       // use `Map` *NOT* `LoadSpec` because we want to do lazy materialization 
to prevent dependency pollution
       @JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
-      @JsonProperty("dimensions")
-      @JsonDeserialize(using = CommaListJoinDeserializer.class)
-      @Nullable
-      List<String> dimensions,
-      @JsonProperty("metrics")
-      @JsonDeserialize(using = CommaListJoinDeserializer.class)
-      @Nullable
-      List<String> metrics,
+      @JsonProperty("dimensions") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> dimensions,
+      @JsonProperty("metrics") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
+      @JsonProperty("projections") @JsonDeserialize(using = 
CommaListJoinDeserializer.class) @Nullable List<String> projections,
       @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
       @JsonProperty("lastCompactionState") @Nullable CompactionState 
lastCompactionState,
       @JsonProperty("binaryVersion") Integer binaryVersion,
@@ -68,6 +63,7 @@ public class LoadableDataSegment extends DataSegment
         loadSpec,
         dimensions,
         metrics,
+        projections,
         shardSpec,
         lastCompactionState,
         binaryVersion,
diff --git 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java
 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java
index c27a0537ae2..9e043d175d8 100644
--- 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java
@@ -293,16 +293,7 @@ public class CachingClusteredClientCacheKeyManagerTest 
extends EasyMockSupport
     QueryableDruidServer queryableDruidServer = 
mock(QueryableDruidServer.class);
     DruidServer server = mock(DruidServer.class);
     SegmentId segmentId = SegmentId.dummy("data-source", partitionNumber);
-    DataSegment segment = new DataSegment(
-        segmentId,
-        null,
-        null,
-        null,
-        new NumberedShardSpec(partitionNumber, 10),
-        null,
-        0,
-        0
-    );
+    DataSegment segment = DataSegment.builder(segmentId).shardSpec(new 
NumberedShardSpec(partitionNumber, 10)).build();
     
expect(server.isSegmentReplicationTarget()).andReturn(isHistorical).anyTimes();
     expect(serverSelector.pick(query, 
CloneQueryMode.EXCLUDECLONES)).andReturn(queryableDruidServer).anyTimes();
     expect(queryableDruidServer.getServer()).andReturn(server).anyTimes();
diff --git 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index d0ae542614c..1318a7c344a 100644
--- 
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -1732,25 +1732,18 @@ public class CachingClusteredClientTest
       int partitions
   )
   {
-    final DataSegment segment = new DataSegment(
-        SegmentId.dummy(DATA_SOURCE),
-        null,
-        null,
-        null,
-        new HashBasedNumberedShardSpec(
-            partitionNum,
-            partitions,
-            partitionNum,
-            partitions,
-            partitionDimensions,
-            partitionFunction,
-            TestHelper.makeJsonMapper()
-        ),
-        null,
-        9,
-        0L
-    );
-
+    final DataSegment segment = 
DataSegment.builder(SegmentId.dummy(DATA_SOURCE))
+                                           .shardSpec(new 
HashBasedNumberedShardSpec(
+                                               partitionNum,
+                                               partitions,
+                                               partitionNum,
+                                               partitions,
+                                               partitionDimensions,
+                                               partitionFunction,
+                                               TestHelper.makeJsonMapper()
+                                           ))
+                                           .binaryVersion(9)
+                                           .build();
     ServerSelector selector = new ServerSelector(
         segment,
         new HighestPriorityTierSelectorStrategy(new 
RandomServerSelectorStrategy()),
@@ -1768,22 +1761,15 @@ public class CachingClusteredClientTest
       int partitionNum
   )
   {
-    final DataSegment segment = new DataSegment(
-        SegmentId.dummy(DATA_SOURCE),
-        null,
-        null,
-        null,
-        new SingleDimensionShardSpec(
-            dimension,
-            start,
-            end,
-            partitionNum,
-            SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
-        ),
-        null,
-        9,
-        0L
-    );
+    final DataSegment segment = 
DataSegment.builder(SegmentId.dummy(DATA_SOURCE))
+                                           .shardSpec(new 
SingleDimensionShardSpec(dimension,
+                                                                               
    start,
+                                                                               
    end,
+                                                                               
    partitionNum,
+                                                                               
    SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
+                                           ))
+                                           .binaryVersion(9)
+                                           .build();
 
     ServerSelector selector = new ServerSelector(
         segment,
@@ -2457,13 +2443,13 @@ public class CachingClusteredClientTest
               (DateTime) objects[i],
               new TimeseriesResultValue(
                   ImmutableMap.<String, Object>builder()
-                      .put("rows", objects[i + 1])
-                      .put("imps", objects[i + 2])
-                      .put("impers", objects[i + 2])
-                      .put("avg_imps_per_row", avg_impr)
-                      .put("avg_imps_per_row_half", avg_impr / 2)
-                      .put("avg_imps_per_row_double", avg_impr * 2)
-                      .build()
+                              .put("rows", objects[i + 1])
+                              .put("imps", objects[i + 2])
+                              .put("impers", objects[i + 2])
+                              .put("avg_imps_per_row", avg_impr)
+                              .put("avg_imps_per_row_half", avg_impr / 2)
+                              .put("avg_imps_per_row_double", avg_impr * 2)
+                              .build()
               )
           )
       );
@@ -2530,14 +2516,14 @@ public class CachingClusteredClientTest
         final double rows = ((Number) objects[index + 1]).doubleValue();
         values.add(
             ImmutableMap.<String, Object>builder()
-                .put(names.get(0), objects[index])
-                .put(names.get(1), rows)
-                .put(names.get(2), imps)
-                .put(names.get(3), imps)
-                .put(names.get(4), imps / rows)
-                .put(names.get(5), ((imps * 2) / rows))
-                .put(names.get(6), (imps / (rows * 2)))
-                .build()
+                        .put(names.get(0), objects[index])
+                        .put(names.get(1), rows)
+                        .put(names.get(2), imps)
+                        .put(names.get(3), imps)
+                        .put(names.get(4), imps / rows)
+                        .put(names.get(5), ((imps * 2) / rows))
+                        .put(names.get(6), (imps / (rows * 2)))
+                        .build()
         );
         index += 3;
       }
diff --git 
a/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java
 
b/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java
index 36bf4e688ef..f895198fd69 100644
--- 
a/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.test.utils.ImmutableDruidDataSourceTestUtils;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
+import org.apache.druid.timeline.SegmentId;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -38,6 +39,18 @@ import java.io.IOException;
 
 public class ImmutableDruidDataSourceTest
 {
+  private static final DataSegment TEST_SEGMENT = 
DataSegment.builder(SegmentId.of(
+                                                                 "test",
+                                                                 
Intervals.of("2017/2018"),
+                                                                 "version",
+                                                                 null
+                                                             ))
+                                                             
.dimensions(ImmutableList.of("dim1", "dim2"))
+                                                             
.metrics(ImmutableList.of("met1", "met2"))
+                                                             
.projections(ImmutableList.of("proj1", "proj2"))
+                                                             .binaryVersion(1)
+                                                             .size(100L)
+                                                             .build();
 
   @Rule
   public ExpectedException expectedException = ExpectedException.none();
@@ -45,8 +58,7 @@ public class ImmutableDruidDataSourceTest
   @Test
   public void testSerde() throws IOException
   {
-    final DataSegment segment = getTestSegment();
-    final ImmutableDruidDataSource dataSource = 
getImmutableDruidDataSource(segment);
+    final ImmutableDruidDataSource dataSource = 
getImmutableDruidDataSource(TEST_SEGMENT);
 
     final ObjectMapper objectMapper = new DefaultObjectMapper()
         .setInjectableValues(new Std().addValue(PruneSpecsHolder.class, 
PruneSpecsHolder.DEFAULT));
@@ -59,13 +71,10 @@ public class ImmutableDruidDataSourceTest
   @Test
   public void testEqualsMethodThrowsUnsupportedOperationException()
   {
-    final DataSegment segment1 = getTestSegment();
+    final ImmutableDruidDataSource dataSource1 = 
getImmutableDruidDataSource(TEST_SEGMENT);
 
-    final ImmutableDruidDataSource dataSource1 = 
getImmutableDruidDataSource(segment1);
 
-    final DataSegment segment2 = getTestSegment();
-
-    final ImmutableDruidDataSource dataSource2 = 
getImmutableDruidDataSource(segment2);
+    final ImmutableDruidDataSource dataSource2 = 
getImmutableDruidDataSource(TEST_SEGMENT);
 
     Assert.assertThrows(
         "ImmutableDruidDataSource shouldn't be used as the key in containers",
@@ -74,37 +83,20 @@ public class ImmutableDruidDataSourceTest
     );
   }
 
-  private ImmutableDruidDataSource getImmutableDruidDataSource(DataSegment 
segment1)
+  private static ImmutableDruidDataSource 
getImmutableDruidDataSource(DataSegment segment1)
   {
     return new ImmutableDruidDataSource(
-      "test",
-      ImmutableMap.of("prop1", "val1", "prop2", "val2"),
-      ImmutableSortedMap.of(segment1.getId(), segment1)
-    );
-  }
-
-  private DataSegment getTestSegment()
-  {
-    return new DataSegment(
         "test",
-        Intervals.of("2017/2018"),
-        "version",
-        null,
-        ImmutableList.of("dim1", "dim2"),
-        ImmutableList.of("met1", "met2"),
-        null,
-        null,
-        1,
-        100L,
-        PruneSpecsHolder.DEFAULT
+        ImmutableMap.of("prop1", "val1", "prop2", "val2"),
+        ImmutableSortedMap.of(segment1.getId(), segment1)
     );
   }
 
+
   @Test
   public void testHashCodeMethodThrowsUnsupportedOperationException()
   {
-    final DataSegment segment = getTestSegment();
-    final ImmutableDruidDataSource dataSource = 
getImmutableDruidDataSource(segment);
+    final ImmutableDruidDataSource dataSource = 
getImmutableDruidDataSource(TEST_SEGMENT);
 
     Assert.assertThrows(
         "ImmutableDruidDataSource shouldn't be used as the key in containers",
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java 
b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 035cf13bb26..100436c9d65 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.commons.text.StringEscapeUtils;
 import org.apache.druid.common.utils.IdUtilsTest;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.JSONParseSpec;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
@@ -45,6 +46,7 @@ import 
org.apache.druid.java.util.common.granularity.DurationGranularity;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.SelectorDimFilter;
@@ -557,6 +559,7 @@ public class DataSchemaTest extends 
InitializedNullHandlingTest
   @Test
   public void testSerde() throws Exception
   {
+    // deserialize, then serialize, then deserialize of DataSchema.
     String jsonStr = "{"
                      + "\"dataSource\":\"" + 
StringEscapeUtils.escapeJson(IdUtilsTest.VALID_ID_CHARS) + "\","
                      + "\"parser\":{"
@@ -582,30 +585,63 @@ public class DataSchemaTest extends 
InitializedNullHandlingTest
         DataSchema.class
     );
 
-    Assert.assertEquals(actual.getDataSource(), IdUtilsTest.VALID_ID_CHARS);
+    Assert.assertEquals(IdUtilsTest.VALID_ID_CHARS, actual.getDataSource());
     Assert.assertEquals(
-        actual.getParser().getParseSpec(),
         new JSONParseSpec(
             new TimestampSpec("xXx", null, null),
             
DimensionsSpec.builder().setDimensionExclusions(Arrays.asList("__time", 
"metric1", "xXx", "col1")).build(),
             null,
             null,
             null
-        )
+        ),
+        actual.getParser().getParseSpec()
     );
     Assert.assertArrayEquals(
-        actual.getAggregators(),
         new AggregatorFactory[]{
             new DoubleSumAggregatorFactory("metric1", "col1")
-        }
+        },
+        actual.getAggregators()
     );
     Assert.assertEquals(
-        actual.getGranularitySpec(),
         new ArbitraryGranularitySpec(
             new DurationGranularity(86400000, null),
             ImmutableList.of(Intervals.of("2014/2015"))
-        )
+        ),
+        actual.getGranularitySpec()
+    );
+    Assert.assertNull(actual.getProjections());
+  }
+
+  @Test
+  public void testSerdeWithProjections() throws Exception
+  {
+    // serialize, then deserialize of DataSchema with projections.
+    AggregateProjectionSpec projectionSpec = new AggregateProjectionSpec(
+        "ab_count_projection",
+        null,
+        Arrays.asList(
+            new StringDimensionSchema("a"),
+            new LongDimensionSchema("b")
+        ),
+        new AggregatorFactory[]{
+            new CountAggregatorFactory("count")
+        }
     );
+    DataSchema original = DataSchema.builder()
+                                    .withDataSource("datasource")
+                                    .withTimestamp(new TimestampSpec(null, 
null, null))
+                                    .withDimensions(DimensionsSpec.EMPTY)
+                                    .withAggregators(new 
CountAggregatorFactory("rows"))
+                                    
.withProjections(ImmutableList.of(projectionSpec))
+                                    .withGranularity(ARBITRARY_GRANULARITY)
+                                    .build();
+    DataSchema serdeResult = 
jsonMapper.readValue(jsonMapper.writeValueAsString(original), DataSchema.class);
+
+    Assert.assertEquals("datasource", serdeResult.getDataSource());
+    Assert.assertArrayEquals(new AggregatorFactory[]{new 
CountAggregatorFactory("rows")}, serdeResult.getAggregators());
+    Assert.assertEquals(ImmutableList.of(projectionSpec), 
serdeResult.getProjections());
+    Assert.assertEquals(ImmutableList.of("ab_count_projection"), 
serdeResult.getProjectionNames());
+    Assert.assertEquals(jsonMapper.writeValueAsString(original), 
jsonMapper.writeValueAsString(serdeResult));
   }
 
   @Test
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
index 89792d85b82..99986d8d94a 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.metadata;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 import org.apache.druid.client.BrokerServerView;
 import org.apache.druid.client.CoordinatorSegmentWatcherConfig;
@@ -62,7 +61,6 @@ import org.apache.druid.server.initialization.ServerConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.security.NoopEscalator;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
@@ -495,21 +493,13 @@ public class CoordinatorSegmentDataCacheConcurrencyTest 
extends SegmentMetadataC
     );
   }
 
-  private DataSegment newSegment(int partitionId)
+  private static DataSegment newSegment(int partitionId)
   {
-    return new DataSegment(
-        DATASOURCE,
-        Intervals.of("2012/2013"),
-        "version1",
-        null,
-        ImmutableList.of(),
-        ImmutableList.of(),
-        new NumberedShardSpec(partitionId, 0),
-        null,
-        1,
-        100L,
-        PruneSpecsHolder.DEFAULT
-    );
+    return DataSegment.builder(SegmentId.of(DATASOURCE, 
Intervals.of("2012/2013"), "version1", partitionId))
+                      .shardSpec(new NumberedShardSpec(partitionId, 0))
+                      .binaryVersion(1)
+                      .size(100L)
+                      .build();
   }
 
   private QueryableIndex newQueryableIndex(int partitionId)
diff --git 
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheTestBase.java
 
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheTestBase.java
index e8380b2f303..89bd878c8c6 100644
--- 
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheTestBase.java
+++ 
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheTestBase.java
@@ -50,6 +50,7 @@ import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.junit.Rule;
@@ -255,19 +256,14 @@ public abstract class SegmentMetadataCacheTestBase 
extends InitializedNullHandli
                    .size(0)
                    .build();
 
-    realtimeSegment1 = new DataSegment(
-        DATASOURCE3,
-        Intervals.of("2012/2013"),
-        "version3",
-        null,
-        ImmutableList.of("dim1", "dim2"),
-        ImmutableList.of("met1", "met2"),
-        new NumberedShardSpec(2, 3),
-        null,
-        1,
-        100L,
-        DataSegment.PruneSpecsHolder.DEFAULT
-    );
+    realtimeSegment1 = DataSegment.builder(SegmentId.of(DATASOURCE3, 
Intervals.of("2012/2013"), "version3", null))
+                                  .shardSpec(new NumberedShardSpec(2, 3))
+                                  .dimensions(ImmutableList.of("dim1", "dim2"))
+                                  .metrics(ImmutableList.of("met1", "met2"))
+                                  .projections(ImmutableList.of("proj1", 
"proj2"))
+                                  .binaryVersion(1)
+                                  .size(100L)
+                                  .build();
   }
 
   public void tearDown() throws Exception
@@ -302,18 +298,13 @@ public abstract class SegmentMetadataCacheTestBase 
extends InitializedNullHandli
 
   public DataSegment newSegment(String datasource, int partitionId)
   {
-    return new DataSegment(
-        datasource,
-        Intervals.of("2012/2013"),
-        "version1",
-        null,
-        ImmutableList.of("dim1", "dim2"),
-        ImmutableList.of("met1", "met2"),
-        new NumberedShardSpec(partitionId, 0),
-        null,
-        1,
-        100L,
-        DataSegment.PruneSpecsHolder.DEFAULT
-    );
+    return DataSegment.builder(SegmentId.of(datasource, 
Intervals.of("2012/2013"), "version1", partitionId))
+                      .shardSpec(new NumberedShardSpec(partitionId, 0))
+                      .dimensions(ImmutableList.of("dim1", "dim2"))
+                      .metrics(ImmutableList.of("met1", "met2"))
+                      .projections(ImmutableList.of("proj1", "proj2"))
+                      .binaryVersion(1)
+                      .size(100L)
+                      .build();
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index 4bf3a8dc22b..0eb46886e58 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -43,7 +43,6 @@ import 
org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
 import org.apache.druid.segment.loading.DataSegmentKiller;
 import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockSupport;
@@ -424,16 +423,14 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
       Set<DataSegment> allSegments = new HashSet<>(segmentsToPublish);
       int id = 0;
       for (DataSegment segment : segmentsToPublish) {
-        DataSegment upgradedSegment = new DataSegment(
-            SegmentId.of(DATA_SOURCE, Intervals.ETERNITY, UPGRADED_VERSION, 
id),
-            segment.getLoadSpec(),
-            segment.getDimensions(),
-            segment.getMetrics(),
-            new NumberedShardSpec(id, 0),
-            null,
-            segment.getBinaryVersion(),
-            segment.getSize()
-        );
+        DataSegment upgradedSegment = DataSegment.builder(segment)
+                                                 .shardSpec(new 
NumberedShardSpec(id, 0))
+                                                 .dataSource(DATA_SOURCE)
+                                                 .interval(Intervals.ETERNITY)
+                                                 .version(UPGRADED_VERSION)
+                                                 .lastCompactionState(null)
+                                                 .build();
+
         id++;
         allSegments.add(upgradedSegment);
       }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
index c3e04f04d6d..47293d8cdda 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
@@ -47,6 +47,7 @@ import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
@@ -570,21 +571,19 @@ public class BatchDataSegmentAnnouncerTest
     }
   }
 
-  private DataSegment makeSegment(int offset, boolean isTombstone)
+  private static DataSegment makeSegment(int offset, boolean isTombstone)
   {
-    DataSegment.Builder builder = DataSegment.builder();
-    builder.dataSource("foo")
-           .interval(
-               new Interval(
-                   DateTimes.of("2013-01-01").plusDays(offset),
-                   DateTimes.of("2013-01-02").plusDays(offset)
-               )
-           )
-           .version(DateTimes.nowUtc().toString())
-           .dimensions(ImmutableList.of("dim1", "dim2"))
-           .metrics(ImmutableList.of("met1", "met2"))
-           .loadSpec(ImmutableMap.of("type", "local"))
-           .size(0);
+    Interval interval = new Interval(
+        DateTimes.of("2013-01-01").plusDays(offset),
+        DateTimes.of("2013-01-02").plusDays(offset)
+    );
+    SegmentId segmentId = SegmentId.of("foo", interval, 
DateTimes.nowUtc().toString(), null);
+    DataSegment.Builder builder = DataSegment.builder(segmentId)
+                                             .loadSpec(ImmutableMap.of("type", 
"local"))
+                                             
.dimensions(ImmutableList.of("dim1", "dim2"))
+                                             .metrics(ImmutableList.of("met1", 
"met2"))
+                                             
.projections(ImmutableList.of("proj1", "proj2"))
+                                             .size(0);
     if (isTombstone) {
       builder.loadSpec(Collections.singletonMap("type", 
DataSegment.TOMBSTONE_LOADSPEC_TYPE));
     }
@@ -592,7 +591,7 @@ public class BatchDataSegmentAnnouncerTest
     return builder.build();
   }
 
-  private DataSegment makeSegment(int offset)
+  private static DataSegment makeSegment(int offset)
   {
     return makeSegment(offset, false);
   }
diff --git a/services/src/main/java/org/apache/druid/cli/ExportMetadata.java 
b/services/src/main/java/org/apache/druid/cli/ExportMetadata.java
index ece46fd11eb..f6c6e510f64 100644
--- a/services/src/main/java/org/apache/druid/cli/ExportMetadata.java
+++ b/services/src/main/java/org/apache/druid/cli/ExportMetadata.java
@@ -431,17 +431,7 @@ public class ExportMetadata extends GuiceRunnable
     }
 
     if (newLoadSpec != null) {
-      segment = new DataSegment(
-          segment.getDataSource(),
-          segment.getInterval(),
-          segment.getVersion(),
-          newLoadSpec,
-          segment.getDimensions(),
-          segment.getMetrics(),
-          segment.getShardSpec(),
-          segment.getBinaryVersion(),
-          segment.getSize()
-      );
+      segment = DataSegment.builder(segment).loadSpec(newLoadSpec).build();
     }
 
     String serialized = JSON_MAPPER.writeValueAsString(segment);
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 834a7dd217d..de32eb8f259 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -156,6 +156,7 @@ public class SystemSchema extends AbstractSchema
       .add("shard_spec", ColumnType.STRING)
       .add("dimensions", ColumnType.STRING)
       .add("metrics", ColumnType.STRING)
+      .add("projections", ColumnType.STRING)
       .add("last_compaction_state", ColumnType.STRING)
       .add("replication_factor", ColumnType.LONG)
       .build();
@@ -173,6 +174,7 @@ public class SystemSchema extends AbstractSchema
           SEGMENTS_SIGNATURE.indexOf("shard_spec"),
           SEGMENTS_SIGNATURE.indexOf("dimensions"),
           SEGMENTS_SIGNATURE.indexOf("metrics"),
+          SEGMENTS_SIGNATURE.indexOf("projections"),
           SEGMENTS_SIGNATURE.indexOf("last_compaction_state")
       }
   );
@@ -372,6 +374,7 @@ public class SystemSchema extends AbstractSchema
                 segment.getShardSpec(),
                 segment.getDimensions(),
                 segment.getMetrics(),
+                segment.getProjections(),
                 segment.getLastCompactionState(),
                 // If the segment is unpublished, we won't have this 
information yet.
                 // If the value is null, the load rules might have not 
evaluated yet, and we don't know the replication factor.
@@ -411,6 +414,7 @@ public class SystemSchema extends AbstractSchema
                 segment.getShardSpec(),
                 segment.getDimensions(),
                 segment.getMetrics(),
+                segment.getProjections(),
                 null, // unpublished segments from realtime tasks will not be 
compacted yet
                 REPLICATION_FACTOR_UNKNOWN // If the segment is unpublished, 
we won't have this information yet.
             };
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
index e74c1c50776..599ad73b200 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.sql.calcite.schema;
 
 import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
@@ -262,7 +261,10 @@ public class BrokerSegmentMetadataCacheConcurrencyTest 
extends BrokerSegmentMeta
         new NoopEscalator(),
         new InternalQueryConfig(),
         new NoopServiceEmitter(),
-        new PhysicalDatasourceMetadataFactory(new 
MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), segmentManager),
+        new PhysicalDatasourceMetadataFactory(
+            new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+            segmentManager
+        ),
         new NoopCoordinatorClient(),
         CentralizedDatasourceSchemaConfig.create()
     )
@@ -414,21 +416,13 @@ public class BrokerSegmentMetadataCacheConcurrencyTest 
extends BrokerSegmentMeta
     );
   }
 
-  private DataSegment newSegment(int partitionId)
+  private static DataSegment newSegment(int partitionId)
   {
-    return new DataSegment(
-        DATASOURCE,
-        Intervals.of("2012/2013"),
-        "version1",
-        null,
-        ImmutableList.of(),
-        ImmutableList.of(),
-        new NumberedShardSpec(partitionId, 0),
-        null,
-        1,
-        100L,
-        DataSegment.PruneSpecsHolder.DEFAULT
-    );
+    return DataSegment.builder(SegmentId.of(DATASOURCE, 
Intervals.of("2012/2013"), "version1", null))
+                      .shardSpec(new NumberedShardSpec(partitionId, 0))
+                      .binaryVersion(1)
+                      .size(100L)
+                      .build();
   }
 
   private QueryableIndex newQueryableIndex(int partitionId)
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
index f9245a492e9..d0e5c012398 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
@@ -897,19 +897,19 @@ public class BrokerSegmentMetadataCacheTest extends 
BrokerSegmentMetadataCacheTe
 
     markDataSourceLatch = new CountDownLatch(1);
     refreshLatch = new CountDownLatch(1);
-    final DataSegment someNewBrokerSegment = new DataSegment(
-        "foo",
-        Intervals.of("2012/2013"),
-        "version1",
-        null,
-        ImmutableList.of("dim1", "dim2"),
-        ImmutableList.of("met1", "met2"),
-        new NumberedShardSpec(2, 3),
-        null,
-        1,
-        100L,
-        DataSegment.PruneSpecsHolder.DEFAULT
-    );
+    final DataSegment someNewBrokerSegment = DataSegment.builder(SegmentId.of(
+                                                            "foo",
+                                                            
Intervals.of("2012/2013"),
+                                                            "version1",
+                                                            null
+                                                        ))
+                                                        .shardSpec(new 
NumberedShardSpec(2, 3))
+                                                        
.dimensions(ImmutableList.of("dim1", "dim2"))
+                                                        
.metrics(ImmutableList.of("met1", "met2"))
+                                                        
.projections(ImmutableList.of("proj1", "proj2"))
+                                                        .binaryVersion(1)
+                                                        .size(100L)
+                                                        .build();
     segmentDataSourceNames.add("foo");
     joinableDataSourceNames.add("foo");
     serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
@@ -964,19 +964,19 @@ public class BrokerSegmentMetadataCacheTest extends 
BrokerSegmentMetadataCacheTe
 
     markDataSourceLatch = new CountDownLatch(1);
     refreshLatch = new CountDownLatch(1);
-    final DataSegment someNewBrokerSegment = new DataSegment(
-        "foo",
-        Intervals.of("2012/2013"),
-        "version1",
-        null,
-        ImmutableList.of("dim1", "dim2"),
-        ImmutableList.of("met1", "met2"),
-        new NumberedShardSpec(2, 3),
-        null,
-        1,
-        100L,
-        DataSegment.PruneSpecsHolder.DEFAULT
-    );
+    final DataSegment someNewBrokerSegment = DataSegment.builder(SegmentId.of(
+                                                            "foo",
+                                                            
Intervals.of("2012/2013"),
+                                                            "version1",
+                                                            null
+                                                        ))
+                                                        .shardSpec(new 
NumberedShardSpec(2, 3))
+                                                        
.dimensions(ImmutableList.of("dim1", "dim2"))
+                                                        
.metrics(ImmutableList.of("met1", "met2"))
+                                                        
.projections(ImmutableList.of("proj1", "proj2"))
+                                                        .binaryVersion(1)
+                                                        .size(100L)
+                                                        .build();
     segmentDataSourceNames.add("foo");
     serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
 
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 34f08c64574..3d005c0426f 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Futures;
 import junitparams.converters.Nullable;
 import org.apache.calcite.DataContext;
@@ -45,6 +44,8 @@ import org.apache.druid.client.InternalQueryConfig;
 import org.apache.druid.client.TimelineServerView;
 import org.apache.druid.client.coordinator.NoopCoordinatorClient;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.discovery.DataNodeService;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.DruidLeaderClient;
@@ -69,7 +70,6 @@ import 
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import 
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.IndexBuilder;
@@ -109,6 +109,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.SegmentStatusInCluster;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
 import org.easymock.EasyMock;
 import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.joda.time.DateTime;
@@ -155,6 +156,17 @@ public class SystemSchemaTest extends CalciteTestBase
       TestDataBuilder.createRow(ImmutableMap.of("t", "2001-01-02", "m1", 
"8.0", "dim3", ImmutableList.of("xyz")))
   );
 
+  private static final ShardSpec SHARD_SPEC = new NumberedShardSpec(0, 1);
+  private static final IncrementalIndexSchema SCHEMA = new 
IncrementalIndexSchema.Builder()
+      .withDimensionsSpec(new DimensionsSpec(ImmutableList.of(new 
StringDimensionSchema("dim1"))))
+      .withMetrics(
+          new CountAggregatorFactory("cnt"),
+          new DoubleSumAggregatorFactory("m1", "m1"),
+          new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
+      )
+      .withRollup(false)
+      .build();
+
   private SystemSchema schema;
   private SpecificSegmentsQuerySegmentWalker walker;
   private DruidLeaderClient client;
@@ -208,46 +220,27 @@ public class SystemSchemaTest extends CalciteTestBase
     final QueryableIndex index1 = IndexBuilder.create()
                                               .tmpDir(new File(tmpDir, "1"))
                                               
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
-                                              .schema(
-                                                  new 
IncrementalIndexSchema.Builder()
-                                                      .withMetrics(
-                                                          new 
CountAggregatorFactory("cnt"),
-                                                          new 
DoubleSumAggregatorFactory("m1", "m1"),
-                                                          new 
HyperUniquesAggregatorFactory("unique_dim1", "dim1")
-                                                      )
-                                                      .withRollup(false)
-                                                      .build()
-                                              )
+                                              .schema(SCHEMA)
                                               .rows(ROWS1)
                                               .buildMMappedIndex();
 
     final QueryableIndex index2 = IndexBuilder.create()
                                               .tmpDir(new File(tmpDir, "2"))
                                               
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
-                                              .schema(
-                                                  new 
IncrementalIndexSchema.Builder()
-                                                      .withMetrics(new 
LongSumAggregatorFactory("m1", "m1"))
-                                                      .withRollup(false)
-                                                      .build()
-                                              )
+                                              .schema(SCHEMA)
                                               .rows(ROWS2)
                                               .buildMMappedIndex();
     final QueryableIndex index3 = IndexBuilder.create()
                                               .tmpDir(new File(tmpDir, "3"))
                                               
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
-                                              .schema(
-                                                  new 
IncrementalIndexSchema.Builder()
-                                                      .withMetrics(new 
LongSumAggregatorFactory("m1", "m1"))
-                                                      .withRollup(false)
-                                                      .build()
-                                              )
+                                              .schema(SCHEMA)
                                               .rows(ROWS3)
                                               .buildMMappedIndex();
 
     walker = SpecificSegmentsQuerySegmentWalker.createWalker(conglomerate)
-        .add(segment1, index1)
-        .add(segment2, index2)
-        .add(segment3, index3);
+                                               .add(segment1, index1)
+                                               .add(segment2, index2)
+                                               .add(segment3, index3);
 
     BrokerSegmentMetadataCache cache = new BrokerSegmentMetadataCache(
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
@@ -316,30 +309,31 @@ public class SystemSchemaTest extends CalciteTestBase
       1,
       83000L
   );
-  private final DataSegment publishedUncompactedSegment3 = new DataSegment(
-      "wikipedia3",
-      Intervals.of("2009/2010"),
-      "version3",
-      null,
-      ImmutableList.of("dim1", "dim2"),
-      ImmutableList.of("met1", "met2"),
-      null,
-      null,
-      1,
-      47000L
-  );
-
-  private final DataSegment segment1 = new DataSegment(
-      "test1",
-      Intervals.of("2010/2011"),
-      "version1",
-      null,
-      ImmutableList.of("dim1", "dim2"),
-      ImmutableList.of("met1", "met2"),
-      null,
-      1,
-      100L
-  );
+  private final DataSegment publishedUncompactedSegment3 = 
DataSegment.builder(SegmentId.of(
+                                                                          
"wikipedia3",
+                                                                          
Intervals.of("2009/2010"),
+                                                                          
"version3",
+                                                                          null
+                                                                      ))
+                                                                      
.dimensions(ImmutableList.of("dim1", "dim2"))
+                                                                      
.metrics(ImmutableList.of("met1", "met2"))
+                                                                      
.projections(ImmutableList.of())
+                                                                      
.binaryVersion(1)
+                                                                      
.size(47000L)
+                                                                      .build();
+
+  private final DataSegment segment1 = DataSegment.builder(SegmentId.of(
+                                                      "test1",
+                                                      
Intervals.of("2010/2011"),
+                                                      "version1",
+                                                      null
+                                                  ))
+                                                  
.dimensions(ImmutableList.of("dim1", "dim2"))
+                                                  
.metrics(ImmutableList.of("met1", "met2"))
+                                                  
.projections(ImmutableList.of("proj1", "proj2"))
+                                                  .binaryVersion(1)
+                                                  .size(100L)
+                                                  .build();
   private final DataSegment segment2 = new DataSegment(
       "test2",
       Intervals.of("2011/2012"),
@@ -547,7 +541,7 @@ public class SystemSchemaTest extends CalciteTestBase
     final RelDataType rowType = segmentsTable.getRowType(new 
JavaTypeFactoryImpl());
     final List<RelDataTypeField> fields = rowType.getFieldList();
 
-    Assert.assertEquals(19, fields.size());
+    Assert.assertEquals(20, fields.size());
 
     final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable) 
schema.getTableMap().get("tasks");
     final RelDataType sysRowType = tasksTable.getRowType(new 
JavaTypeFactoryImpl());
@@ -585,138 +579,104 @@ public class SystemSchemaTest extends CalciteTestBase
     rows.sort((Object[] row1, Object[] row2) -> ((Comparable) 
row1[0]).compareTo(row2[0]));
 
     // total segments = 8
-    // segments test1, test2  are published and available
-    // segment test3 is served by historical but unpublished or unused
-    // segments test4, test5 are not published but available (realtime 
segments)
-    // segment test2 is both published and served by a realtime server.
-
     Assert.assertEquals(8, rows.size());
-
-    verifyRow(
-        rows.get(0),
-        "test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1",
-        100L,
-        0L, //partition_num
-        1L, //num_replicas
-        3L, //numRows
-        1L, //is_published
-        1L, //is_available
-        0L, //is_realtime
-        1L, //is_overshadowed
-        null, //is_compacted
-        2L  // replication_factor
-    );
-
-    verifyRow(
-        rows.get(1),
-        "test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2",
-        100L,
-        0L, //partition_num
-        2L, //x§segment test2 is served by historical and realtime servers
-        3L, //numRows
-        1L, //is_published
-        1L, //is_available
-        0L, //is_realtime
-        0L, //is_overshadowed,
-        null, //is_compacted
-        0L  // replication_factor
-    );
-
-    //segment test3 is unpublished and has a NumberedShardSpec with 
partitionNum = 2
-    verifyRow(
-        rows.get(2),
-        "test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2",
-        100L,
-        2L, //partition_num
-        1L, //num_replicas
-        2L, //numRows
-        0L, //is_published
-        1L, //is_available
-        0L, //is_realtime
-        0L, //is_overshadowed
-        null, //is_compacted
-        -1L   // replication_factor
-    );
-
-    verifyRow(
-        rows.get(3),
-        "test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4",
-        100L,
-        0L, //partition_num
-        1L, //num_replicas
-        0L, //numRows
-        0L, //is_published
-        1L, //is_available
-        1L, //is_realtime
-        0L, //is_overshadowed
-        null, //is_compacted
-        -1L  // replication_factor
-    );
-
-    verifyRow(
-        rows.get(4),
-        "test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5",
-        100L,
-        0L, //partition_num
-        1L, //num_replicas
-        0L, //numRows
-        0L, //is_published
-        1L, //is_available
-        1L, //is_realtime
-        0L, //is_overshadowed
-        null, //is_compacted
-        -1L  // replication_factor
-    );
-
-    // wikipedia segments are published and unavailable, num_replicas is 0
-    // wikipedia segment 1 and 2 are compacted while 3 are not compacted
-    verifyRow(
-        rows.get(5),
-        
"wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1",
-        53000L,
-        0L, //partition_num
-        0L, //num_replicas
-        0L, //numRows
-        1L, //is_published
-        0L, //is_available
-        0L, //is_realtime
-        1L, //is_overshadowed
-        expectedCompactionState, //is_compacted
-        2L  // replication_factor
-    );
-
-    verifyRow(
-        rows.get(6),
-        
"wikipedia2_2008-01-01T00:00:00.000Z_2009-01-01T00:00:00.000Z_version2",
-        83000L,
-        0L, //partition_num
-        0L, //num_replicas
-        0L, //numRows
-        1L, //is_published
-        0L, //is_available
-        0L, //is_realtime
-        0L, //is_overshadowed
-        expectedCompactionState, //is_compacted
-        0L  // replication_factor
-    );
-
-    verifyRow(
-        rows.get(7),
-        
"wikipedia3_2009-01-01T00:00:00.000Z_2010-01-01T00:00:00.000Z_version3",
-        47000L,
-        0L, //partition_num
-        0L, //num_replicas
-        0L, //numRows
-        1L, //is_published
-        0L, //is_available
-        0L, //is_realtime
-        0L, //is_overshadowed
-        null, //is_compacted
-        2L  // replication_factor
-    );
-
     // Verify value types.
     verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE);
+
+    // segments test1, test2  are published and available.
+    Object[] segment1Expected = new Object[]{
+        // segment_id, datasource
+        "test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", 
"test1",
+        // start, end, size, version, partition_num, num_replicas, numRows
+        "2010-01-01T00:00:00.000Z", "2011-01-01T00:00:00.000Z", 100L, 
"version1", 0L, 1L, 3L,
+        //  is_active, is_published, is_available, is_realtime, 
is_overshadowed, shard_spec
+        0L, 1L, 1L, 0L, 1L, MAPPER.writeValueAsString(SHARD_SPEC),
+        // dimensions, metrics, projections, last_compaction_state, 
replication_factor
+        "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", "[\"proj1\",\"proj2\"]", 
null, 2L
+    };
+    Assert.assertArrayEquals(segment1Expected, rows.get(0));
+    Object[] segment2Expected = new Object[]{
+        // segment_id, datasource
+        "test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", 
"test2",
+        // start, end, size, version, partition_num, num_replicas, numRows
+        "2011-01-01T00:00:00.000Z", "2012-01-01T00:00:00.000Z", 100L, 
"version2", 0L, 2L, 3L,
+        //  is_active, is_published, is_available, is_realtime, 
is_overshadowed, shard_spec
+        1L, 1L, 1L, 0L, 0L, MAPPER.writeValueAsString(SHARD_SPEC),
+        // dimensions, metrics, projections, last_compaction_state, 
replication_factor
+        "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null, null, 0L
+    };
+    Assert.assertArrayEquals(segment2Expected, rows.get(1));
+    //segment test3 is unpublished and has a NumberedShardSpec with 
partitionNum = 2, is served by historical but unpublished or unused
+    Object[] segment3Expected = new Object[]{
+        // segment_id, datasource
+        "test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2", 
"test3",
+        // start, end, size, version, partition_num, num_replicas, numRows
+        "2012-01-01T00:00:00.000Z", "2013-01-01T00:00:00.000Z", 100L, 
"version3", 2L, 1L, 2L,
+        //  is_active, is_published, is_available, is_realtime, 
is_overshadowed, shard_spec
+        0L, 0L, 1L, 0L, 0L, MAPPER.writeValueAsString(new NumberedShardSpec(2, 
3)),
+        // dimensions, metrics, projections, last_compaction_state, 
replication_factor
+        "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null, null, -1L
+    };
+    Assert.assertArrayEquals(segment3Expected, rows.get(2));
+    // segments test4, test5 are not published but available (realtime 
segments)
+    Object[] segment4Expected = new Object[]{
+        // segment_id, datasource
+        "test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4", 
"test4",
+        // start, end, size, version, partition_num, num_replicas, numRows
+        "2014-01-01T00:00:00.000Z", "2015-01-01T00:00:00.000Z", 100L, 
"version4", 0L, 1L, 0L,
+        //  is_active, is_published, is_available, is_realtime, 
is_overshadowed, shard_spec
+        1L, 0L, 1L, 1L, 0L, MAPPER.writeValueAsString(SHARD_SPEC),
+        // dimensions, metrics, projections, last_compaction_state, 
replication_factor
+        "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null, null, -1L
+    };
+    Assert.assertArrayEquals(segment4Expected, rows.get(3));
+    Object[] segment5Expected = new Object[]{
+        // segment_id, datasource
+        "test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5", 
"test5",
+        // start, end, size, version, partition_num, num_replicas, numRows
+        "2015-01-01T00:00:00.000Z", "2016-01-01T00:00:00.000Z", 100L, 
"version5", 0L, 1L, 0L,
+        //  is_active, is_published, is_available, is_realtime, 
is_overshadowed, shard_spec
+        1L, 0L, 1L, 1L, 0L, MAPPER.writeValueAsString(SHARD_SPEC),
+        // dimensions, metrics, projections, last_compaction_state, 
replication_factor
+        "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null, null, -1L
+    };
+    Assert.assertArrayEquals(segment5Expected, rows.get(4));
+
+    // wikipedia segment 1 and segment 2 are published and unavailable and 
compacted, num_replicas is 0
+    Object[] wikiSegment1Expected = new Object[]{
+        // segment_id, datasource
+        
"wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1", 
"wikipedia1",
+        // start, end, size, version, partition_num, num_replicas, numRows
+        "2007-01-01T00:00:00.000Z", "2008-01-01T00:00:00.000Z", 53_000L, 
"version1", 0L, 0L, 0L,
+        //  is_active, is_published, is_available, is_realtime, 
is_overshadowed, shard_spec
+        0L, 1L, 0L, 0L, 1L, MAPPER.writeValueAsString(SHARD_SPEC),
+        // dimensions, metrics, projections, last_compaction_state, 
replication_factor
+        "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null, 
MAPPER.writeValueAsString(expectedCompactionState), 2L
+    };
+    Assert.assertArrayEquals(wikiSegment1Expected, rows.get(5));
+    Object[] wikiSegment2Expected = new Object[]{
+        // segment_id, datasource
+        
"wikipedia2_2008-01-01T00:00:00.000Z_2009-01-01T00:00:00.000Z_version2", 
"wikipedia2",
+        // start, end, size, version, partition_num, num_replicas, numRows
+        "2008-01-01T00:00:00.000Z", "2009-01-01T00:00:00.000Z", 83_000L, 
"version2", 0L, 0L, 0L,
+        //  is_active, is_published, is_available, is_realtime, 
is_overshadowed, shard_spec
+        1L, 1L, 0L, 0L, 0L, MAPPER.writeValueAsString(SHARD_SPEC),
+        // dimensions, metrics, projections, last_compaction_state, 
replication_factor
+        "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null, 
MAPPER.writeValueAsString(expectedCompactionState), 0L
+    };
+    Assert.assertArrayEquals(wikiSegment2Expected, rows.get(6));
+    // wikipedia segment 3 are not compacted, and is projection aware.
+    Object[] wikiSegment3Expected = new Object[]{
+        // segment_id, datasource
+        
"wikipedia3_2009-01-01T00:00:00.000Z_2010-01-01T00:00:00.000Z_version3", 
"wikipedia3",
+        // start, end, size, version, partition_num, num_replicas, numRows
+        "2009-01-01T00:00:00.000Z", "2010-01-01T00:00:00.000Z", 47_000L, 
"version3", 0L, 0L, 0L,
+        //  is_active, is_published, is_available, is_realtime, 
is_overshadowed, shard_spec
+        1L, 1L, 0L, 0L, 0L, MAPPER.writeValueAsString(SHARD_SPEC),
+        // dimensions, metrics, projections, last_compaction_state, 
replication_factor
+        "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", "[]", null, 2L
+    };
+    Assert.assertArrayEquals(wikiSegment3Expected, rows.get(7));
   }
 
   @Test
@@ -787,44 +747,6 @@ public class SystemSchemaTest extends CalciteTestBase
     );
   }
 
-  private void verifyRow(
-      Object[] row,
-      String segmentId,
-      long size,
-      long partitionNum,
-      long numReplicas,
-      long numRows,
-      long isPublished,
-      long isAvailable,
-      long isRealtime,
-      long isOvershadowed,
-      CompactionState compactionState,
-      long replicationFactor
-  ) throws Exception
-  {
-    Assert.assertEquals(segmentId, row[0].toString());
-    SegmentId id = 
Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0);
-    Assert.assertEquals(id.getDataSource(), row[1]);
-    Assert.assertEquals(id.getIntervalStart().toString(), row[2]);
-    Assert.assertEquals(id.getIntervalEnd().toString(), row[3]);
-    Assert.assertEquals(size, row[4]);
-    Assert.assertEquals(id.getVersion(), row[5]);
-    Assert.assertEquals(partitionNum, row[6]);
-    Assert.assertEquals(numReplicas, row[7]);
-    Assert.assertEquals(numRows, row[8]);
-    Assert.assertEquals((((isPublished == 1) && (isOvershadowed == 0)) || 
(isRealtime == 1)) ? 1L : 0L, row[9]);
-    Assert.assertEquals(isPublished, row[10]);
-    Assert.assertEquals(isAvailable, row[11]);
-    Assert.assertEquals(isRealtime, row[12]);
-    Assert.assertEquals(isOvershadowed, row[13]);
-    if (compactionState == null) {
-      Assert.assertNull(row[17]);
-    } else {
-      Assert.assertEquals(MAPPER.writeValueAsString(compactionState), row[17]);
-    }
-    Assert.assertEquals(replicationFactor, row[18]);
-  }
-
   @Test
   public void testServersTable() throws URISyntaxException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to