This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch 34.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/34.0.0 by this push:
new eb7847c0d01 Add projection names to DataSegment in a backward
compatible manner. (#18223) (#18287)
eb7847c0d01 is described below
commit eb7847c0d015c3215cfdb03c30dbeddf4227889f
Author: Lucas Capistrant <[email protected]>
AuthorDate: Fri Jul 18 15:03:21 2025 -0500
Add projection names to DataSegment in a backward compatible manner.
(#18223) (#18287)
* Add projection names to DataSegment in a backward compatible manner.
* fix build
* fix build2
* fix unit test, consolidate projection serde to always convert missing key
& null value to empty collection.
* revert some previous changes that converts null projections to empty
* fix test
* add projections to sys.segments
* fix it
* style, and unit test
* update DataSchemaTest
* IT test
* fix ldap test
* fix build
* add sleep for flaky test
* add test
* small data segment change
* small data segment change
Co-authored-by: Cece Mei <[email protected]>
---
.../msq/input/table/DataSegmentWithLocation.java | 31 +-
.../DataSegmentAndIndexZipFilePathTest.java | 26 +-
.../indexing/common/actions/TaskLocksTest.java | 82 ++---
.../concurrent/ConcurrentReplaceAndAppendTest.java | 14 +-
.../ConcurrentReplaceAndStreamingAppendTest.java | 14 +-
.../druid/testsEx/auth/ITSecurityBasicQuery.java | 3 +
.../docker/test-data/ldap-security-sample-data.sql | 2 +-
.../docker/test-data/security-sample-data.sql | 2 +-
.../results/auth_test_sys_schema_segments.json | 1 +
.../druid/jackson/CommaListJoinSerializer.java | 7 +
.../org/apache/druid/timeline/DataSegment.java | 185 +++++++----
.../org/apache/druid/timeline/DataSegmentTest.java | 143 +++++---
.../druid/timeline/SegmentStatusInClusterTest.java | 39 ++-
.../IndexerSQLMetadataStorageCoordinator.java | 33 +-
.../apache/druid/segment/indexing/DataSchema.java | 13 +-
.../realtime/appenderator/StreamAppenderator.java | 50 ++-
.../apache/druid/segment/realtime/sink/Sink.java | 19 +-
.../server/coordination/LoadableDataSegment.java | 12 +-
.../CachingClusteredClientCacheKeyManagerTest.java | 11 +-
.../druid/client/CachingClusteredClientTest.java | 86 +++--
.../druid/client/ImmutableDruidDataSourceTest.java | 50 ++-
.../druid/segment/indexing/DataSchemaTest.java | 50 ++-
...CoordinatorSegmentDataCacheConcurrencyTest.java | 22 +-
.../metadata/SegmentMetadataCacheTestBase.java | 43 +--
.../appenderator/StreamAppenderatorDriverTest.java | 19 +-
.../BatchDataSegmentAnnouncerTest.java | 29 +-
.../java/org/apache/druid/cli/ExportMetadata.java | 12 +-
.../druid/sql/calcite/schema/SystemSchema.java | 4 +
.../BrokerSegmentMetadataCacheConcurrencyTest.java | 26 +-
.../schema/BrokerSegmentMetadataCacheTest.java | 52 +--
.../druid/sql/calcite/schema/SystemSchemaTest.java | 360 ++++++++-------------
31 files changed, 694 insertions(+), 746 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
index 0e83e9c3ede..92d017a0dba 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java
@@ -51,14 +51,11 @@ public class DataSegmentWithLocation extends DataSegment
@JsonProperty("version") String version,
// use `Map` *NOT* `LoadSpec` because we want to do lazy materialization
to prevent dependency pollution
@JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
- @JsonProperty("dimensions")
- @JsonDeserialize(using = CommaListJoinDeserializer.class)
- @Nullable
+ @JsonProperty("dimensions") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable
List<String> dimensions,
- @JsonProperty("metrics")
- @JsonDeserialize(using = CommaListJoinDeserializer.class)
- @Nullable
- List<String> metrics,
+ @JsonProperty("metrics") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
+ @JsonProperty("projections") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable
+ List<String> projections,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lastCompactionState") @Nullable CompactionState
lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@@ -67,7 +64,20 @@ public class DataSegmentWithLocation extends DataSegment
@JacksonInject PruneSpecsHolder pruneSpecsHolder
)
{
- super(dataSource, interval, version, loadSpec, dimensions, metrics,
shardSpec, lastCompactionState, binaryVersion, size, pruneSpecsHolder);
+ super(
+ dataSource,
+ interval,
+ version,
+ loadSpec,
+ dimensions,
+ metrics,
+ projections,
+ shardSpec,
+ lastCompactionState,
+ binaryVersion,
+ size,
+ pruneSpecsHolder
+ );
this.servers = Preconditions.checkNotNull(servers, "servers");
}
@@ -83,9 +93,12 @@ public class DataSegmentWithLocation extends DataSegment
dataSegment.getLoadSpec(),
dataSegment.getDimensions(),
dataSegment.getMetrics(),
+ dataSegment.getProjections(),
dataSegment.getShardSpec(),
+ null,
dataSegment.getBinaryVersion(),
- dataSegment.getSize()
+ dataSegment.getSize(),
+ PruneSpecsHolder.DEFAULT
);
this.servers = servers;
}
diff --git
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
index 4eae8064c2d..aac1056def2 100644
---
a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
+++
b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DataSegmentAndIndexZipFilePathTest.java
@@ -35,26 +35,12 @@ public class DataSegmentAndIndexZipFilePathTest
{
private static final SegmentId SEGMENT_ID = SegmentId.dummy("data-source",
1);
private static final SegmentId OTHER_SEGMENT_ID =
SegmentId.dummy("data-source2", 1);
- private static final DataSegment SEGMENT = new DataSegment(
- SEGMENT_ID,
- null,
- null,
- null,
- new NumberedShardSpec(1, 10),
- null,
- 0,
- 0
- );
- private static final DataSegment OTHER_SEGMENT = new DataSegment(
- OTHER_SEGMENT_ID,
- null,
- null,
- null,
- new NumberedShardSpec(1, 10),
- null,
- 0,
- 0
- );
+ private static final DataSegment SEGMENT = DataSegment.builder(SEGMENT_ID)
+ .shardSpec(new
NumberedShardSpec(1, 10))
+ .build();
+ private static final DataSegment OTHER_SEGMENT =
DataSegment.builder(OTHER_SEGMENT_ID)
+ .shardSpec(new
NumberedShardSpec(1, 10))
+ .build();
private DataSegmentAndIndexZipFilePath target;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
index 3fd3cd9cbd9..0d4ef09a789 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
@@ -77,28 +78,17 @@ public class TaskLocksTest
private Set<DataSegment> createTimeChunkedSegments()
{
+ final String version = DateTimes.nowUtc().toString();
return ImmutableSet.of(
- new DataSegment.Builder()
- .dataSource(task.getDataSource())
- .interval(Intervals.of("2017-01-01/2017-01-02"))
- .version(DateTimes.nowUtc().toString())
- .shardSpec(new LinearShardSpec(2))
- .size(0)
- .build(),
- new DataSegment.Builder()
- .dataSource(task.getDataSource())
- .interval(Intervals.of("2017-01-02/2017-01-03"))
- .version(DateTimes.nowUtc().toString())
- .shardSpec(new LinearShardSpec(2))
- .size(0)
- .build(),
- new DataSegment.Builder()
- .dataSource(task.getDataSource())
- .interval(Intervals.of("2017-01-03/2017-01-04"))
- .version(DateTimes.nowUtc().toString())
- .shardSpec(new LinearShardSpec(2))
- .size(0)
- .build()
+ DataSegment.builder(SegmentId.of(task.getDataSource(),
Intervals.of("2017-01-01/2017-01-02"), version, null))
+ .shardSpec(new LinearShardSpec(2))
+ .build(),
+ DataSegment.builder(SegmentId.of(task.getDataSource(),
Intervals.of("2017-01-02/2017-01-03"), version, null))
+ .shardSpec(new LinearShardSpec(2))
+ .build(),
+ DataSegment.builder(SegmentId.of(task.getDataSource(),
Intervals.of("2017-01-03/2017-01-04"), version, null))
+ .shardSpec(new LinearShardSpec(2))
+ .build()
);
}
@@ -106,41 +96,21 @@ public class TaskLocksTest
{
final String version = DateTimes.nowUtc().toString();
return ImmutableSet.of(
- new DataSegment.Builder()
- .dataSource(task.getDataSource())
- .interval(Intervals.of("2017-01-01/2017-01-02"))
- .version(version)
- .shardSpec(new NumberedShardSpec(0, 0))
- .size(0)
- .build(),
- new DataSegment.Builder()
- .dataSource(task.getDataSource())
- .interval(Intervals.of("2017-01-01/2017-01-02"))
- .version(version)
- .shardSpec(new NumberedShardSpec(1, 0))
- .size(0)
- .build(),
- new DataSegment.Builder()
- .dataSource(task.getDataSource())
- .interval(Intervals.of("2017-01-01/2017-01-02"))
- .version(version)
- .shardSpec(new NumberedShardSpec(2, 0))
- .size(0)
- .build(),
- new DataSegment.Builder()
- .dataSource(task.getDataSource())
- .interval(Intervals.of("2017-01-01/2017-01-02"))
- .version(version)
- .shardSpec(new NumberedShardSpec(3, 0))
- .size(0)
- .build(),
- new DataSegment.Builder()
- .dataSource(task.getDataSource())
- .interval(Intervals.of("2017-01-01/2017-01-02"))
- .version(version)
- .shardSpec(new NumberedShardSpec(4, 0))
- .size(0)
- .build()
+ DataSegment.builder(SegmentId.of(task.getDataSource(),
Intervals.of("2017-01-01/2017-01-02"), version, null))
+ .shardSpec(new NumberedShardSpec(0, 0))
+ .build(),
+ DataSegment.builder(SegmentId.of(task.getDataSource(),
Intervals.of("2017-01-01/2017-01-02"), version, null))
+ .shardSpec(new NumberedShardSpec(1, 0))
+ .build(),
+ DataSegment.builder(SegmentId.of(task.getDataSource(),
Intervals.of("2017-01-01/2017-01-02"), version, null))
+ .shardSpec(new NumberedShardSpec(2, 0))
+ .build(),
+ DataSegment.builder(SegmentId.of(task.getDataSource(),
Intervals.of("2017-01-01/2017-01-02"), version, null))
+ .shardSpec(new NumberedShardSpec(3, 0))
+ .build(),
+ DataSegment.builder(SegmentId.of(task.getDataSource(),
Intervals.of("2017-01-01/2017-01-02"), version, null))
+ .shardSpec(new NumberedShardSpec(4, 0))
+ .build()
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index be2d351f1c5..a6c0d772647 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -1221,16 +1221,10 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
{
final SegmentId id = pendingSegment.asSegmentId();
- return new DataSegment(
- id,
- Collections.singletonMap(id.toString(), id.toString()),
- Collections.emptyList(),
- Collections.emptyList(),
- pendingSegment.getShardSpec(),
- null,
- 0,
- 0
- );
+ return DataSegment.builder(id)
+ .loadSpec(Collections.singletonMap(id.toString(),
id.toString()))
+ .shardSpec(pendingSegment.getShardSpec())
+ .build();
}
private void verifyIntervalHasUsedSegments(Interval interval, DataSegment...
expectedSegments)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
index 8f374ff16b5..868d8c15568 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java
@@ -680,16 +680,10 @@ public class ConcurrentReplaceAndStreamingAppendTest
extends IngestionTestBase
private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment)
{
final SegmentId id = pendingSegment.asSegmentId();
- return new DataSegment(
- id,
- Collections.singletonMap(id.toString(), id.toString()),
- Collections.emptyList(),
- Collections.emptyList(),
- pendingSegment.getShardSpec(),
- null,
- 0,
- 0
- );
+ return DataSegment.builder(id)
+ .loadSpec(Collections.singletonMap(id.toString(),
id.toString()))
+ .shardSpec(pendingSegment.getShardSpec())
+ .build();
}
private void verifyIntervalHasUsedSegments(Interval interval, DataSegment...
expectedSegments)
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/auth/ITSecurityBasicQuery.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/auth/ITSecurityBasicQuery.java
index 399a5e3b798..3612faa9eb0 100644
---
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/auth/ITSecurityBasicQuery.java
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/auth/ITSecurityBasicQuery.java
@@ -105,6 +105,9 @@ public class ITSecurityBasicQuery
List<ResourceAction> permissions = ImmutableList.of();
securityClient.setPermissionsToRole(ROLE_1, permissions);
+ // Allow permissions sync across cluster to avoid flakes
+ Thread.sleep(SYNC_SLEEP);
+
String queryLocal =
StringUtils.format(
"INSERT INTO %s\n"
diff --git a/integration-tests/docker/test-data/ldap-security-sample-data.sql
b/integration-tests/docker/test-data/ldap-security-sample-data.sql
index 732cc55d4a5..5e4774663c0 100644
--- a/integration-tests/docker/test-data/ldap-security-sample-data.sql
+++ b/integration-tests/docker/test-data/ldap-security-sample-data.sql
@@ -14,4 +14,4 @@
-- limitations under the License.
INSERT INTO druid_tasks (id, created_date, datasource, payload,
status_payload, active) VALUES ('index_auth_test_2030-04-30T01:13:31.893Z',
'2030-04-30T01:13:31.893Z', 'auth_test',
'{\"id\":\"index_auth_test_2030-04-30T01:13:31.893Z\",\"created_date\":\"2030-04-30T01:13:31.893Z\",\"datasource\":\"auth_test\",\"active\":0}',
'{\"id\":\"index_auth_test_2030-04-30T01:13:31.893Z\",\"status\":\"SUCCESS\",\"duration\":1}',
0);
-INSERT INTO druid_segments
(id,dataSource,created_date,start,end,partitioned,version,used,payload,used_status_last_updated)
VALUES
('auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','auth_test','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\
[...]
+INSERT INTO druid_segments
(id,dataSource,created_date,start,end,partitioned,version,used,payload,used_status_last_updated)
VALUES
('auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','auth_test','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\
[...]
diff --git a/integration-tests/docker/test-data/security-sample-data.sql
b/integration-tests/docker/test-data/security-sample-data.sql
index 732cc55d4a5..5e4774663c0 100644
--- a/integration-tests/docker/test-data/security-sample-data.sql
+++ b/integration-tests/docker/test-data/security-sample-data.sql
@@ -14,4 +14,4 @@
-- limitations under the License.
INSERT INTO druid_tasks (id, created_date, datasource, payload,
status_payload, active) VALUES ('index_auth_test_2030-04-30T01:13:31.893Z',
'2030-04-30T01:13:31.893Z', 'auth_test',
'{\"id\":\"index_auth_test_2030-04-30T01:13:31.893Z\",\"created_date\":\"2030-04-30T01:13:31.893Z\",\"datasource\":\"auth_test\",\"active\":0}',
'{\"id\":\"index_auth_test_2030-04-30T01:13:31.893Z\",\"status\":\"SUCCESS\",\"duration\":1}',
0);
-INSERT INTO druid_segments
(id,dataSource,created_date,start,end,partitioned,version,used,payload,used_status_last_updated)
VALUES
('auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','auth_test','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\
[...]
+INSERT INTO druid_segments
(id,dataSource,created_date,start,end,partitioned,version,used,payload,used_status_last_updated)
VALUES
('auth_test_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','auth_test','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\
[...]
diff --git
a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
index 1ce7b44bc61..1e3ae5ece90 100644
---
a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
+++
b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
@@ -17,6 +17,7 @@
"shard_spec": "{\"type\":\"none\"}",
"dimensions":
"[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]",
"metrics":
"[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]",
+ "projections": "[\"daily_added\",\"daily_unique_users\"]",
"last_compaction_state": null,
"replication_factor": 2
}
diff --git
a/processing/src/main/java/org/apache/druid/jackson/CommaListJoinSerializer.java
b/processing/src/main/java/org/apache/druid/jackson/CommaListJoinSerializer.java
index 2ef3835419d..00ed1c5e31f 100644
---
a/processing/src/main/java/org/apache/druid/jackson/CommaListJoinSerializer.java
+++
b/processing/src/main/java/org/apache/druid/jackson/CommaListJoinSerializer.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.util.List;
/**
+ * A custom Jackson serializer that converts a list of objects into a
comma-separated string.
*/
public class CommaListJoinSerializer extends StdScalarSerializer<List<String>>
{
@@ -43,4 +44,10 @@ public class CommaListJoinSerializer extends
StdScalarSerializer<List<String>>
{
jgen.writeString(JOINER.join(value));
}
+
+ @Override
+ public boolean isEmpty(SerializerProvider prov, List<String> value)
+ {
+ return value.isEmpty();
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/timeline/DataSegment.java
b/processing/src/main/java/org/apache/druid/timeline/DataSegment.java
index c0560a83985..4dc9b8edd94 100644
--- a/processing/src/main/java/org/apache/druid/timeline/DataSegment.java
+++ b/processing/src/main/java/org/apache/druid/timeline/DataSegment.java
@@ -45,11 +45,11 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+
/**
* Metadata of Druid's data segment. An immutable object.
- *
+ * <p>
* DataSegment's equality ({@link #equals}/{@link #hashCode}) and {@link
#compareTo} methods consider only the
* {@link SegmentId} of the segment.
*/
@@ -86,6 +86,7 @@ public class DataSegment implements Comparable<DataSegment>,
Overshadowable<Data
private static final Interner<String> STRING_INTERNER =
Interners.newWeakInterner();
private static final Interner<List<String>> DIMENSIONS_INTERNER =
Interners.newWeakInterner();
private static final Interner<List<String>> METRICS_INTERNER =
Interners.newWeakInterner();
+ private static final Interner<List<String>> PROJECTIONS_INTERNER =
Interners.newWeakInterner();
private static final Interner<CompactionState> COMPACTION_STATE_INTERNER =
Interners.newWeakInterner();
private static final Map<String, Object> PRUNED_LOAD_SPEC = ImmutableMap.of(
"load spec is pruned, because it's not needed on Brokers, but eats a lot
of heap space",
@@ -98,6 +99,7 @@ public class DataSegment implements Comparable<DataSegment>,
Overshadowable<Data
private final Map<String, Object> loadSpec;
private final List<String> dimensions;
private final List<String> metrics;
+ private final List<String> projections;
private final ShardSpec shardSpec;
/**
@@ -111,7 +113,10 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
private final CompactionState lastCompactionState;
private final long size;
- @VisibleForTesting
+ /**
+ * @deprecated use {@link #builder(SegmentId)} or {@link
#builder(DataSegment)} instead.
+ */
+ @Deprecated
public DataSegment(
SegmentId segmentId,
Map<String, Object> loadSpec,
@@ -130,13 +135,19 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
loadSpec,
dimensions,
metrics,
+ null,
shardSpec,
lastCompactionState,
binaryVersion,
- size
+ size,
+ PruneSpecsHolder.DEFAULT
);
}
+ /**
+ * @deprecated use {@link #builder(SegmentId)} or {@link
#builder(DataSegment)} instead.
+ */
+ @Deprecated
public DataSegment(
String dataSource,
Interval interval,
@@ -156,13 +167,19 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
loadSpec,
dimensions,
metrics,
+ null,
shardSpec,
null,
binaryVersion,
- size
+ size,
+ PruneSpecsHolder.DEFAULT
);
}
+ /**
+ * @deprecated use {@link #builder(SegmentId)} or {@link
#builder(DataSegment)} instead.
+ */
+ @Deprecated
public DataSegment(
String dataSource,
Interval interval,
@@ -183,6 +200,7 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
loadSpec,
dimensions,
metrics,
+ null,
shardSpec,
lastCompactionState,
binaryVersion,
@@ -198,14 +216,11 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
@JsonProperty("version") String version,
// use `Map` *NOT* `LoadSpec` because we want to do lazy materialization
to prevent dependency pollution
@JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
- @JsonProperty("dimensions")
- @JsonDeserialize(using = CommaListJoinDeserializer.class)
- @Nullable
- List<String> dimensions,
- @JsonProperty("metrics")
- @JsonDeserialize(using = CommaListJoinDeserializer.class)
- @Nullable
- List<String> metrics,
+ @JsonProperty("dimensions") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable
+ List<String> dimensions,
+ @JsonProperty("metrics") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
+ @JsonProperty("projections") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable
+ List<String> projections,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lastCompactionState") @Nullable CompactionState
lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@@ -216,10 +231,11 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
this.id = SegmentId.of(dataSource, interval, version, shardSpec);
// prune loadspec if needed
this.loadSpec = pruneSpecsHolder.pruneLoadSpec ? PRUNED_LOAD_SPEC :
prepareLoadSpec(loadSpec);
- // Deduplicating dimensions and metrics lists as a whole because they are
very likely the same for the same
- // dataSource
- this.dimensions = prepareDimensionsOrMetrics(dimensions,
DIMENSIONS_INTERNER);
- this.metrics = prepareDimensionsOrMetrics(metrics, METRICS_INTERNER);
+ this.dimensions = dimensions == null ? ImmutableList.of() :
prepareWithInterner(dimensions, DIMENSIONS_INTERNER);
+ this.metrics = metrics == null ? ImmutableList.of() :
prepareWithInterner(metrics, METRICS_INTERNER);
+ // A null value for projections means that this segment is not aware of
projections (launched in druid 32).
+ // An empty list means that this segment is projection-aware, but has no
projections.
+ this.projections = projections == null ? null :
prepareWithInterner(projections, PROJECTIONS_INTERNER);
this.shardSpec = (shardSpec == null) ? new NumberedShardSpec(0, 1) :
shardSpec;
this.lastCompactionState = pruneSpecsHolder.pruneLastCompactionState
? null
@@ -229,46 +245,6 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
this.size = size;
}
- @Nullable
- private Map<String, Object> prepareLoadSpec(@Nullable Map<String, Object>
loadSpec)
- {
- if (loadSpec == null) {
- return null;
- }
- // Load spec is just of 3 entries on average; HashMap/LinkedHashMap
consumes much more memory than ArrayMap
- Map<String, Object> result = new Object2ObjectArrayMap<>(loadSpec.size());
- for (Map.Entry<String, Object> e : loadSpec.entrySet()) {
- result.put(STRING_INTERNER.intern(e.getKey()), e.getValue());
- }
- return result;
- }
-
- @Nullable
- private CompactionState prepareCompactionState(@Nullable CompactionState
lastCompactionState)
- {
- if (lastCompactionState == null) {
- return null;
- }
- return COMPACTION_STATE_INTERNER.intern(lastCompactionState);
- }
-
- private List<String> prepareDimensionsOrMetrics(@Nullable List<String> list,
Interner<List<String>> interner)
- {
- if (list == null) {
- return ImmutableList.of();
- } else {
- List<String> result = list
- .stream()
- .filter(s -> !Strings.isNullOrEmpty(s))
- // dimensions & metrics are stored as canonical string values to
decrease memory required for storing
- // large numbers of segments.
- .map(STRING_INTERNER::intern)
- // TODO replace with ImmutableList.toImmutableList() when updated to
Guava 21+
- .collect(Collectors.collectingAndThen(Collectors.toList(),
ImmutableList::copyOf));
- return interner.intern(result);
- }
- }
-
/**
* Get dataSource
*
@@ -314,6 +290,15 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
return metrics;
}
+ @Nullable
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonSerialize(using = CommaListJoinSerializer.class)
+ public List<String> getProjections()
+ {
+ return projections;
+ }
+
@JsonProperty
public ShardSpec getShardSpec()
{
@@ -416,6 +401,11 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
return builder(this).metrics(metrics).build();
}
+ public DataSegment withProjections(List<String> projections)
+ {
+ return builder(this).projections(projections).build();
+ }
+
public DataSegment withShardSpec(ShardSpec newSpec)
{
return builder(this).shardSpec(newSpec).build();
@@ -471,17 +461,65 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
", loadSpec=" + loadSpec +
", dimensions=" + dimensions +
", metrics=" + metrics +
+ ", projections=" + projections +
", shardSpec=" + shardSpec +
", lastCompactionState=" + lastCompactionState +
", size=" + size +
'}';
}
+
+ @Nullable
+ private static Map<String, Object> prepareLoadSpec(@Nullable Map<String,
Object> loadSpec)
+ {
+ if (loadSpec == null) {
+ return null;
+ }
+ // Load spec is just of 3 entries on average; HashMap/LinkedHashMap
consumes much more memory than ArrayMap
+ Map<String, Object> result = new Object2ObjectArrayMap<>(loadSpec.size());
+ for (Map.Entry<String, Object> e : loadSpec.entrySet()) {
+ result.put(STRING_INTERNER.intern(e.getKey()), e.getValue());
+ }
+ return result;
+ }
+
+ @Nullable
+ private static CompactionState prepareCompactionState(@Nullable
CompactionState lastCompactionState)
+ {
+ if (lastCompactionState == null) {
+ return null;
+ }
+ return COMPACTION_STATE_INTERNER.intern(lastCompactionState);
+ }
+
+ /**
+ * Returns a list of strings with all empty strings removed and all strings
interned.
+ * <p>
+ * The dimensions, metrics, and projections are stored as canonical string
values to decrease memory required for
+ * storing large numbers of segments.
+ */
+ private static List<String> prepareWithInterner(List<String> list,
Interner<List<String>> interner)
+ {
+ return interner.intern(list.stream()
+ .filter(s -> !Strings.isNullOrEmpty(s))
+ .map(STRING_INTERNER::intern)
+ .collect(ImmutableList.toImmutableList()));
+ }
+
+ /**
+ * @deprecated use {@link #builder(SegmentId)} or {@link
#builder(DataSegment)} instead.
+ */
+ @Deprecated
public static Builder builder()
{
return new Builder();
}
+ public static Builder builder(SegmentId segmentId)
+ {
+ return new Builder(segmentId);
+ }
+
public static Builder builder(DataSegment segment)
{
return new Builder(segment);
@@ -495,21 +533,39 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
private Map<String, Object> loadSpec;
private List<String> dimensions;
private List<String> metrics;
+ private List<String> projections;
private ShardSpec shardSpec;
private CompactionState lastCompactionState;
private Integer binaryVersion;
private long size;
- public Builder()
+ /**
+ * @deprecated use {@link #Builder(SegmentId)} or {@link
#Builder(DataSegment)} instead.
+ */
+ @Deprecated
+ private Builder()
{
this.loadSpec = ImmutableMap.of();
this.dimensions = ImmutableList.of();
this.metrics = ImmutableList.of();
+ // By default, segment is not projection-aware.
+ this.projections = null;
this.shardSpec = new NumberedShardSpec(0, 1);
this.size = -1;
}
- public Builder(DataSegment segment)
+ private Builder(SegmentId segmentId)
+ {
+ this.dataSource = segmentId.getDataSource();
+ this.interval = segmentId.getInterval();
+ this.version = segmentId.getVersion();
+ this.shardSpec = new NumberedShardSpec(0, 1);
+ this.binaryVersion = 0;
+ this.size = 0;
+ this.lastCompactionState = null;
+ }
+
+ private Builder(DataSegment segment)
{
this.dataSource = segment.getDataSource();
this.interval = segment.getInterval();
@@ -517,6 +573,7 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
this.loadSpec = segment.getLoadSpec();
this.dimensions = segment.getDimensions();
this.metrics = segment.getMetrics();
+ this.projections = segment.getProjections();
this.shardSpec = segment.getShardSpec();
this.lastCompactionState = segment.getLastCompactionState();
this.binaryVersion = segment.getBinaryVersion();
@@ -559,6 +616,12 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
return this;
}
+ public Builder projections(List<String> projections)
+ {
+ this.projections = projections;
+ return this;
+ }
+
public Builder shardSpec(ShardSpec shardSpec)
{
this.shardSpec = shardSpec;
@@ -598,10 +661,12 @@ public class DataSegment implements
Comparable<DataSegment>, Overshadowable<Data
loadSpec,
dimensions,
metrics,
+ projections,
shardSpec,
lastCompactionState,
binaryVersion,
- size
+ size,
+ PruneSpecsHolder.DEFAULT
);
}
}
diff --git
a/processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
b/processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
index 1f0a4f68ff0..54caacdf48b 100644
--- a/processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
+++ b/processing/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
@@ -57,6 +57,7 @@ import java.util.Set;
import java.util.function.Function;
/**
+ * Unit tests for {@link DataSegment}, covering construction, serialization,
and utility methods.
*/
public class DataSegmentTest
{
@@ -114,6 +115,62 @@ public class DataSegmentTest
MAPPER.setInjectableValues(injectableValues);
}
+ @Test
+ public void testSerializationWithProjections() throws Exception
+ {
+ // arrange
+ final Interval interval = Intervals.of("2011-10-01/2011-10-02");
+ final ShardSpec shardSpec = new NumberedShardSpec(3, 0);
+ final SegmentId segmentId = SegmentId.of("something", interval, "1",
shardSpec);
+
+ final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something",
"or_other");
+ final CompactionState compactionState = new CompactionState(
+ new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
+ new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "bar",
"foo"))),
+ ImmutableList.of(new CountAggregatorFactory("count")),
+ new CompactionTransformSpec(new SelectorDimFilter("dim1", "foo",
null)),
+ MAPPER.convertValue(ImmutableMap.of(), IndexSpec.class),
+ MAPPER.convertValue(ImmutableMap.of(), GranularitySpec.class),
+ null
+ );
+ final DataSegment segment = DataSegment.builder(segmentId)
+ .loadSpec(loadSpec)
+ .dimensions(Arrays.asList("dim1",
"dim2"))
+ .metrics(Arrays.asList("met1",
"met2"))
+ .projections(Arrays.asList("proj1",
"proj2"))
+ .shardSpec(shardSpec)
+
.lastCompactionState(compactionState)
+ .binaryVersion(TEST_VERSION)
+ .size(1)
+ .build();
+ // act & assert
+ final Map<String, Object> objectMap = MAPPER.readValue(
+ MAPPER.writeValueAsString(segment),
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+ Assert.assertEquals(12, objectMap.size());
+ Assert.assertEquals("something", objectMap.get("dataSource"));
+ Assert.assertEquals(interval.toString(), objectMap.get("interval"));
+ Assert.assertEquals("1", objectMap.get("version"));
+ Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
+ Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
+ Assert.assertEquals("met1,met2", objectMap.get("metrics"));
+ Assert.assertEquals("proj1,proj2", objectMap.get("projections"));
+ Assert.assertEquals(
+ ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions",
0),
+ objectMap.get("shardSpec")
+ );
+ Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
+ Assert.assertEquals(1, objectMap.get("size"));
+ Assert.assertEquals(6, ((Map)
objectMap.get("lastCompactionState")).size());
+ // another act & assert
+ DataSegment deserializedSegment =
MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
+ assertAllFieldsEquals(segment, deserializedSegment);
+ Assert.assertEquals(0, segment.compareTo(deserializedSegment));
+ Assert.assertEquals(0, deserializedSegment.compareTo(segment));
+ Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode());
+ }
+
@Test
public void testV1Serialization() throws Exception
{
@@ -156,31 +213,18 @@ public class DataSegmentTest
Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
Assert.assertEquals("met1,met2", objectMap.get("metrics"));
- Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3,
"partitions", 0), objectMap.get("shardSpec"));
+ Assert.assertEquals(
+ ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions",
0),
+ objectMap.get("shardSpec")
+ );
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
Assert.assertEquals(6, ((Map)
objectMap.get("lastCompactionState")).size());
DataSegment deserializedSegment =
MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
-
- Assert.assertEquals(segment.getDataSource(),
deserializedSegment.getDataSource());
- Assert.assertEquals(segment.getInterval(),
deserializedSegment.getInterval());
- Assert.assertEquals(segment.getVersion(),
deserializedSegment.getVersion());
- Assert.assertEquals(segment.getLoadSpec(),
deserializedSegment.getLoadSpec());
- Assert.assertEquals(segment.getDimensions(),
deserializedSegment.getDimensions());
- Assert.assertEquals(segment.getMetrics(),
deserializedSegment.getMetrics());
- Assert.assertEquals(segment.getShardSpec(),
deserializedSegment.getShardSpec());
- Assert.assertEquals(segment.getSize(), deserializedSegment.getSize());
- Assert.assertEquals(segment.getId(), deserializedSegment.getId());
- Assert.assertEquals(segment.getLastCompactionState(),
deserializedSegment.getLastCompactionState());
-
- deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment),
DataSegment.class);
+ assertAllFieldsEquals(segment, deserializedSegment);
Assert.assertEquals(0, segment.compareTo(deserializedSegment));
-
- deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment),
DataSegment.class);
Assert.assertEquals(0, deserializedSegment.compareTo(segment));
-
- deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment),
DataSegment.class);
Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode());
}
@@ -251,22 +295,16 @@ public class DataSegmentTest
Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
Assert.assertEquals("met1,met2", objectMap.get("metrics"));
- Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3,
"partitions", 0), objectMap.get("shardSpec"));
+ Assert.assertEquals(
+ ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions",
0),
+ objectMap.get("shardSpec")
+ );
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
Assert.assertEquals(3, ((Map)
objectMap.get("lastCompactionState")).size());
DataSegment deserializedSegment =
MAPPER.readValue(lastCompactionStateWithNullSpecs, DataSegment.class);
- Assert.assertEquals(segment.getDataSource(),
deserializedSegment.getDataSource());
- Assert.assertEquals(segment.getInterval(),
deserializedSegment.getInterval());
- Assert.assertEquals(segment.getVersion(),
deserializedSegment.getVersion());
- Assert.assertEquals(segment.getLoadSpec(),
deserializedSegment.getLoadSpec());
- Assert.assertEquals(segment.getDimensions(),
deserializedSegment.getDimensions());
- Assert.assertEquals(segment.getMetrics(),
deserializedSegment.getMetrics());
- Assert.assertEquals(segment.getShardSpec(),
deserializedSegment.getShardSpec());
- Assert.assertEquals(segment.getSize(), deserializedSegment.getSize());
- Assert.assertEquals(segment.getId(), deserializedSegment.getId());
- Assert.assertEquals(segment.getLastCompactionState(),
deserializedSegment.getLastCompactionState());
+ assertAllFieldsEquals(segment, deserializedSegment);
Assert.assertNotNull(segment.getLastCompactionState());
Assert.assertNull(segment.getLastCompactionState().getDimensionsSpec());
Assert.assertNull(segment.getLastCompactionState().getTransformSpec());
@@ -274,13 +312,8 @@ public class DataSegmentTest
Assert.assertNotNull(deserializedSegment.getLastCompactionState());
Assert.assertNull(deserializedSegment.getLastCompactionState().getDimensionsSpec());
- deserializedSegment = MAPPER.readValue(lastCompactionStateWithNullSpecs,
DataSegment.class);
Assert.assertEquals(0, segment.compareTo(deserializedSegment));
-
- deserializedSegment = MAPPER.readValue(lastCompactionStateWithNullSpecs,
DataSegment.class);
Assert.assertEquals(0, deserializedSegment.compareTo(segment));
-
- deserializedSegment = MAPPER.readValue(lastCompactionStateWithNullSpecs,
DataSegment.class);
Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode());
}
@@ -338,8 +371,12 @@ public class DataSegmentTest
@Test
public void testV1SerializationNullMetrics() throws Exception
{
- final DataSegment segment =
- makeDataSegment("foo", "2012-01-01/2012-01-02",
DateTimes.of("2012-01-01T11:22:33.444Z").toString());
+ final DataSegment segment = DataSegment.builder(SegmentId.of(
+ "foo",
+ Intervals.of("2012-01-01/2012-01-02"),
+ DateTimes.of("2012-01-01T11:22:33.444Z").toString(),
+ null
+ )).size(1).build();
final DataSegment segment2 =
MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
Assert.assertEquals("empty dimensions", ImmutableList.of(),
segment2.getDimensions());
@@ -367,12 +404,12 @@ public class DataSegmentTest
.lastCompactionState(compactionState)
.build();
final DataSegment segment2 = DataSegment.builder()
- .dataSource("foo")
-
.interval(Intervals.of("2012-01-01/2012-01-02"))
-
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
- .shardSpec(getShardSpec(7))
- .size(0)
- .build();
+ .dataSource("foo")
+
.interval(Intervals.of("2012-01-01/2012-01-02"))
+
.version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
+ .shardSpec(getShardSpec(7))
+ .size(0)
+ .build();
Assert.assertEquals(segment1,
segment2.withLastCompactionState(compactionState));
}
@@ -409,7 +446,7 @@ public class DataSegmentTest
transformSpec,
indexSpec,
granularitySpec,
- null
+ ImmutableList.of()
);
final DataSegment segment1 = DataSegment.builder()
@@ -476,13 +513,19 @@ public class DataSegmentTest
}
- private DataSegment makeDataSegment(String dataSource, String interval,
String version)
+ private static void assertAllFieldsEquals(DataSegment segment1, DataSegment
segment2)
{
- return DataSegment.builder()
- .dataSource(dataSource)
- .interval(Intervals.of(interval))
- .version(version)
- .size(1)
- .build();
+ Assert.assertEquals(segment1.getDataSource(), segment2.getDataSource());
+ Assert.assertEquals(segment1.getInterval(), segment2.getInterval());
+ Assert.assertEquals(segment1.getVersion(), segment2.getVersion());
+ Assert.assertEquals(segment1.getLoadSpec(), segment2.getLoadSpec());
+ Assert.assertEquals(segment1.getDimensions(), segment2.getDimensions());
+ Assert.assertEquals(segment1.getMetrics(), segment2.getMetrics());
+ Assert.assertEquals(segment1.getProjections(), segment2.getProjections());
+ Assert.assertEquals(segment1.getShardSpec(), segment2.getShardSpec());
+ Assert.assertEquals(segment1.getSize(), segment2.getSize());
+ Assert.assertEquals(segment1.getBinaryVersion(),
segment2.getBinaryVersion());
+ Assert.assertEquals(segment1.getId(), segment2.getId());
+ Assert.assertEquals(segment1.getLastCompactionState(),
segment2.getLastCompactionState());
}
}
diff --git
a/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
index cc08f72ab02..5293f939902 100644
---
a/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
+++
b/processing/src/test/java/org/apache/druid/timeline/SegmentStatusInClusterTest.java
@@ -65,19 +65,15 @@ public class SegmentStatusInClusterTest
private static SegmentStatusInCluster createSegmentForTest()
{
- DataSegment dataSegment = new DataSegment(
- "something",
- INTERVAL,
- "1",
- LOAD_SPEC,
- Arrays.asList("dim1", "dim2"),
- Arrays.asList("met1", "met2"),
- NoneShardSpec.instance(),
- null,
- TEST_VERSION,
- 1
- );
-
+ DataSegment dataSegment = DataSegment.builder(SegmentId.of("something",
INTERVAL, "1", null))
+ .shardSpec(NoneShardSpec.instance())
+ .dimensions(Arrays.asList("dim1",
"dim2"))
+ .metrics(Arrays.asList("met1",
"met2"))
+ .projections(Arrays.asList("proj1",
"proj2"))
+ .loadSpec(LOAD_SPEC)
+ .binaryVersion(TEST_VERSION)
+ .size(1)
+ .build();
return new SegmentStatusInCluster(dataSegment, OVERSHADOWED,
REPLICATION_FACTOR, NUM_ROWS, REALTIME);
}
@@ -89,13 +85,14 @@ public class SegmentStatusInClusterTest
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
- Assert.assertEquals(14, objectMap.size());
+ Assert.assertEquals(15, objectMap.size());
Assert.assertEquals("something", objectMap.get("dataSource"));
Assert.assertEquals(INTERVAL.toString(), objectMap.get("interval"));
Assert.assertEquals("1", objectMap.get("version"));
Assert.assertEquals(LOAD_SPEC, objectMap.get("loadSpec"));
Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
Assert.assertEquals("met1,met2", objectMap.get("metrics"));
+ Assert.assertEquals("proj1,proj2", objectMap.get("projections"));
Assert.assertEquals(ImmutableMap.of("type", "none"),
objectMap.get("shardSpec"));
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
@@ -118,6 +115,7 @@ public class SegmentStatusInClusterTest
Assert.assertEquals(dataSegment.getLoadSpec(),
deserializedSegment.getLoadSpec());
Assert.assertEquals(dataSegment.getDimensions(),
deserializedSegment.getDimensions());
Assert.assertEquals(dataSegment.getMetrics(),
deserializedSegment.getMetrics());
+ Assert.assertEquals(dataSegment.getProjections(),
deserializedSegment.getProjections());
Assert.assertEquals(dataSegment.getShardSpec(),
deserializedSegment.getShardSpec());
Assert.assertEquals(dataSegment.getSize(), deserializedSegment.getSize());
Assert.assertEquals(dataSegment.getId(), deserializedSegment.getId());
@@ -155,11 +153,10 @@ class TestSegment extends DataSegment
@JsonProperty("dimensions")
@JsonDeserialize(using = CommaListJoinDeserializer.class)
@Nullable
- List<String> dimensions,
- @JsonProperty("metrics")
- @JsonDeserialize(using = CommaListJoinDeserializer.class)
- @Nullable
- List<String> metrics,
+ List<String> dimensions,
+ @JsonProperty("metrics") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
+ @JsonProperty("projections") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable
+ List<String> projections,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lasCompactionState") @Nullable CompactionState
lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@@ -175,10 +172,12 @@ class TestSegment extends DataSegment
loadSpec,
dimensions,
metrics,
+ projections,
shardSpec,
lastCompactionState,
binaryVersion,
- size
+ size,
+ PruneSpecsHolder.DEFAULT
);
this.overshadowed = overshadowed;
this.replicationFactor = replicationFactor;
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index a2c5956772f..4ff74ea1f44 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -1203,18 +1203,11 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final SegmentId newVersionSegmentId =
pendingSegment.getId().asSegmentId();
newVersionSegmentToParent.put(newVersionSegmentId,
oldSegment.getId());
upgradedFromSegmentIdMap.put(newVersionSegmentId.toString(),
oldSegment.getId().toString());
- allSegmentsToInsert.add(
- new DataSegment(
- pendingSegment.getId().asSegmentId(),
- oldSegment.getLoadSpec(),
- oldSegment.getDimensions(),
- oldSegment.getMetrics(),
- pendingSegment.getId().getShardSpec(),
- oldSegment.getLastCompactionState(),
- oldSegment.getBinaryVersion(),
- oldSegment.getSize()
- )
- );
+ allSegmentsToInsert.add(DataSegment.builder(oldSegment)
+
.interval(newVersionSegmentId.getInterval())
+
.version(newVersionSegmentId.getVersion())
+
.shardSpec(pendingSegment.getId().getShardSpec())
+ .build());
}
}
);
@@ -2548,18 +2541,10 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
Set<DataSegment> segmentsWithAllocationInfo = new HashSet<>();
for (SegmentId id : overlappingSegmentIds) {
final int corePartitions =
versionIntervalToNumCorePartitions.get(id.getVersion()).get(id.getInterval());
- segmentsWithAllocationInfo.add(
- new DataSegment(
- id,
- null,
- null,
- null,
- new NumberedShardSpec(id.getPartitionNum(), corePartitions),
- null,
- null,
- 1
- )
- );
+ segmentsWithAllocationInfo.add(DataSegment.builder(id)
+ .shardSpec(new
NumberedShardSpec(id.getPartitionNum(), corePartitions))
+ .size(1)
+ .build());
}
return segmentsWithAllocationInfo;
}
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
index 6c35cb8d391..de951618df2 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
@@ -88,6 +88,7 @@ public class DataSchema
// This is used for backward compatibility
private InputRowParser inputRowParser;
+ @Nullable
private List<AggregateProjectionSpec> projections;
@JsonCreator
@@ -186,7 +187,7 @@ public class DataSchema
/**
* Computes the set of field names that are specified by the provided
dimensions and aggregator lists.
- *
+ * <p>
* If either list is null, it is ignored.
*
* @throws IllegalArgumentException if there are duplicate field names, or
if any dimension or aggregator
@@ -357,11 +358,21 @@ public class DataSchema
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
public List<AggregateProjectionSpec> getProjections()
{
return projections;
}
+ @Nullable
+ public List<String> getProjectionNames()
+ {
+ if (projections == null) {
+ return null;
+ }
+ return
projections.stream().map(AggregateProjectionSpec::getName).collect(Collectors.toList());
+ }
+
@Deprecated
@JsonProperty("parser")
@Nullable
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 396474bd29d..0d41f396f0e 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -205,10 +205,10 @@ public class StreamAppenderator implements Appenderator
/**
* This constructor allows the caller to provide its own
SinkQuerySegmentWalker.
- *
+ * <p>
* The sinkTimeline is set to the sink timeline of the provided
SinkQuerySegmentWalker.
* If the SinkQuerySegmentWalker is null, a new sink timeline is initialized.
- *
+ * <p>
* It is used by UnifiedIndexerAppenderatorsManager which allows queries on
data associated with multiple
* Appenderators.
*/
@@ -868,7 +868,6 @@ public class StreamAppenderator implements Appenderator
* @param identifier sink identifier
* @param sink sink to push
* @param useUniquePath true if the segment should be written to a path with
a unique identifier
- *
* @return segment descriptor, or null if the sink is no longer valid
*/
@Nullable
@@ -1115,7 +1114,7 @@ public class StreamAppenderator implements Appenderator
* Unannounces the given base segment and all its upgraded versions.
*
* @param baseSegment base segment
- * @param sink sink corresponding to the base segment
+ * @param sink sink corresponding to the base segment
* @return the set of all segment ids associated with the base segment
containing the upgraded ids and itself.
*/
private Set<SegmentIdWithShardSpec>
unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink)
@@ -1128,17 +1127,15 @@ public class StreamAppenderator implements Appenderator
final Set<SegmentIdWithShardSpec> upgradedVersionsOfSegment =
baseSegmentToUpgradedSegments.remove(baseId);
for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) {
- final DataSegment newSegment = new DataSegment(
- newId.getDataSource(),
- newId.getInterval(),
- newId.getVersion(),
- baseSegment.getLoadSpec(),
- baseSegment.getDimensions(),
- baseSegment.getMetrics(),
- newId.getShardSpec(),
- baseSegment.getBinaryVersion(),
- baseSegment.getSize()
- );
+ final DataSegment newSegment = DataSegment.builder(newId.asSegmentId())
+
.shardSpec(newId.getShardSpec())
+
.loadSpec(baseSegment.getLoadSpec())
+
.dimensions(baseSegment.getDimensions())
+
.metrics(baseSegment.getMetrics())
+
.projections(baseSegment.getProjections())
+
.binaryVersion(baseSegment.getBinaryVersion())
+ .size(baseSegment.getSize())
+ .build();
unannounceSegment(newSegment);
upgradedSegmentToBaseSegment.remove(newId);
}
@@ -1182,17 +1179,15 @@ public class StreamAppenderator implements Appenderator
private DataSegment getUpgradedSegment(DataSegment baseSegment,
SegmentIdWithShardSpec upgradedVersion)
{
- return new DataSegment(
- upgradedVersion.getDataSource(),
- upgradedVersion.getInterval(),
- upgradedVersion.getVersion(),
- baseSegment.getLoadSpec(),
- baseSegment.getDimensions(),
- baseSegment.getMetrics(),
- upgradedVersion.getShardSpec(),
- baseSegment.getBinaryVersion(),
- baseSegment.getSize()
- );
+ return DataSegment.builder(upgradedVersion.asSegmentId())
+ .shardSpec(upgradedVersion.getShardSpec())
+ .loadSpec(baseSegment.getLoadSpec())
+ .dimensions(baseSegment.getDimensions())
+ .metrics(baseSegment.getMetrics())
+ .projections(baseSegment.getProjections())
+ .binaryVersion(baseSegment.getBinaryVersion())
+ .size(baseSegment.getSize())
+ .build();
}
private void lockBasePersistDirectory()
@@ -1433,7 +1428,7 @@ public class StreamAppenderator implements Appenderator
* Update the state of the appenderator when adding a sink.
*
* @param identifier sink identifier
- * @param sink sink to be added
+ * @param sink sink to be added
*/
private void addSink(SegmentIdWithShardSpec identifier, Sink sink)
{
@@ -1622,7 +1617,6 @@ public class StreamAppenderator implements Appenderator
*
* @param indexToPersist hydrant to persist
* @param identifier the segment this hydrant is going to be part of
- *
* @return the number of rows persisted
*/
private int persistHydrant(FireHydrant indexToPersist,
SegmentIdWithShardSpec identifier)
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
index 45803107f18..87b2ab9cc52 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/sink/Sink.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.realtime.sink;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -47,6 +46,7 @@ import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Overshadowable;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval;
@@ -249,17 +249,12 @@ public class Sink implements Iterable<FireHydrant>,
Overshadowable<Sink>
public DataSegment getSegment()
{
- return new DataSegment(
- schema.getDataSource(),
- interval,
- version,
- ImmutableMap.of(),
- Collections.emptyList(),
- Lists.transform(Arrays.asList(schema.getAggregators()),
AggregatorFactory::getName),
- shardSpec,
- null,
- 0
- );
+ return DataSegment.builder(SegmentId.of(schema.getDataSource(), interval,
version, shardSpec))
+ .shardSpec(shardSpec)
+ .dimensions(null)
+
.metrics(Lists.transform(Arrays.asList(schema.getAggregators()),
AggregatorFactory::getName))
+ .projections(schema.getProjectionNames())
+ .build();
}
public int getNumRows()
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
index 2a633b6ad27..486bbf20bbb 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java
@@ -47,14 +47,9 @@ public class LoadableDataSegment extends DataSegment
@JsonProperty("version") String version,
// use `Map` *NOT* `LoadSpec` because we want to do lazy materialization
to prevent dependency pollution
@JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
- @JsonProperty("dimensions")
- @JsonDeserialize(using = CommaListJoinDeserializer.class)
- @Nullable
- List<String> dimensions,
- @JsonProperty("metrics")
- @JsonDeserialize(using = CommaListJoinDeserializer.class)
- @Nullable
- List<String> metrics,
+ @JsonProperty("dimensions") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable List<String> dimensions,
+ @JsonProperty("metrics") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable List<String> metrics,
+ @JsonProperty("projections") @JsonDeserialize(using =
CommaListJoinDeserializer.class) @Nullable List<String> projections,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lastCompactionState") @Nullable CompactionState
lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@@ -68,6 +63,7 @@ public class LoadableDataSegment extends DataSegment
loadSpec,
dimensions,
metrics,
+ projections,
shardSpec,
lastCompactionState,
binaryVersion,
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java
index c27a0537ae2..9e043d175d8 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientCacheKeyManagerTest.java
@@ -293,16 +293,7 @@ public class CachingClusteredClientCacheKeyManagerTest
extends EasyMockSupport
QueryableDruidServer queryableDruidServer =
mock(QueryableDruidServer.class);
DruidServer server = mock(DruidServer.class);
SegmentId segmentId = SegmentId.dummy("data-source", partitionNumber);
- DataSegment segment = new DataSegment(
- segmentId,
- null,
- null,
- null,
- new NumberedShardSpec(partitionNumber, 10),
- null,
- 0,
- 0
- );
+ DataSegment segment = DataSegment.builder(segmentId).shardSpec(new
NumberedShardSpec(partitionNumber, 10)).build();
expect(server.isSegmentReplicationTarget()).andReturn(isHistorical).anyTimes();
expect(serverSelector.pick(query,
CloneQueryMode.EXCLUDECLONES)).andReturn(queryableDruidServer).anyTimes();
expect(queryableDruidServer.getServer()).andReturn(server).anyTimes();
diff --git
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
index d0ae542614c..1318a7c344a 100644
---
a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
+++
b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java
@@ -1732,25 +1732,18 @@ public class CachingClusteredClientTest
int partitions
)
{
- final DataSegment segment = new DataSegment(
- SegmentId.dummy(DATA_SOURCE),
- null,
- null,
- null,
- new HashBasedNumberedShardSpec(
- partitionNum,
- partitions,
- partitionNum,
- partitions,
- partitionDimensions,
- partitionFunction,
- TestHelper.makeJsonMapper()
- ),
- null,
- 9,
- 0L
- );
-
+ final DataSegment segment =
DataSegment.builder(SegmentId.dummy(DATA_SOURCE))
+ .shardSpec(new
HashBasedNumberedShardSpec(
+ partitionNum,
+ partitions,
+ partitionNum,
+ partitions,
+ partitionDimensions,
+ partitionFunction,
+ TestHelper.makeJsonMapper()
+ ))
+ .binaryVersion(9)
+ .build();
ServerSelector selector = new ServerSelector(
segment,
new HighestPriorityTierSelectorStrategy(new
RandomServerSelectorStrategy()),
@@ -1768,22 +1761,15 @@ public class CachingClusteredClientTest
int partitionNum
)
{
- final DataSegment segment = new DataSegment(
- SegmentId.dummy(DATA_SOURCE),
- null,
- null,
- null,
- new SingleDimensionShardSpec(
- dimension,
- start,
- end,
- partitionNum,
- SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
- ),
- null,
- 9,
- 0L
- );
+ final DataSegment segment =
DataSegment.builder(SegmentId.dummy(DATA_SOURCE))
+ .shardSpec(new
SingleDimensionShardSpec(dimension,
+
start,
+
end,
+
partitionNum,
+
SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS
+ ))
+ .binaryVersion(9)
+ .build();
ServerSelector selector = new ServerSelector(
segment,
@@ -2457,13 +2443,13 @@ public class CachingClusteredClientTest
(DateTime) objects[i],
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
- .put("rows", objects[i + 1])
- .put("imps", objects[i + 2])
- .put("impers", objects[i + 2])
- .put("avg_imps_per_row", avg_impr)
- .put("avg_imps_per_row_half", avg_impr / 2)
- .put("avg_imps_per_row_double", avg_impr * 2)
- .build()
+ .put("rows", objects[i + 1])
+ .put("imps", objects[i + 2])
+ .put("impers", objects[i + 2])
+ .put("avg_imps_per_row", avg_impr)
+ .put("avg_imps_per_row_half", avg_impr / 2)
+ .put("avg_imps_per_row_double", avg_impr * 2)
+ .build()
)
)
);
@@ -2530,14 +2516,14 @@ public class CachingClusteredClientTest
final double rows = ((Number) objects[index + 1]).doubleValue();
values.add(
ImmutableMap.<String, Object>builder()
- .put(names.get(0), objects[index])
- .put(names.get(1), rows)
- .put(names.get(2), imps)
- .put(names.get(3), imps)
- .put(names.get(4), imps / rows)
- .put(names.get(5), ((imps * 2) / rows))
- .put(names.get(6), (imps / (rows * 2)))
- .build()
+ .put(names.get(0), objects[index])
+ .put(names.get(1), rows)
+ .put(names.get(2), imps)
+ .put(names.get(3), imps)
+ .put(names.get(4), imps / rows)
+ .put(names.get(5), ((imps * 2) / rows))
+ .put(names.get(6), (imps / (rows * 2)))
+ .build()
);
index += 3;
}
diff --git
a/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java
b/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java
index 36bf4e688ef..f895198fd69 100644
---
a/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java
+++
b/server/src/test/java/org/apache/druid/client/ImmutableDruidDataSourceTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.test.utils.ImmutableDruidDataSourceTestUtils;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
+import org.apache.druid.timeline.SegmentId;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -38,6 +39,18 @@ import java.io.IOException;
public class ImmutableDruidDataSourceTest
{
+ private static final DataSegment TEST_SEGMENT =
DataSegment.builder(SegmentId.of(
+ "test",
+
Intervals.of("2017/2018"),
+ "version",
+ null
+ ))
+
.dimensions(ImmutableList.of("dim1", "dim2"))
+
.metrics(ImmutableList.of("met1", "met2"))
+
.projections(ImmutableList.of("proj1", "proj2"))
+ .binaryVersion(1)
+ .size(100L)
+ .build();
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -45,8 +58,7 @@ public class ImmutableDruidDataSourceTest
@Test
public void testSerde() throws IOException
{
- final DataSegment segment = getTestSegment();
- final ImmutableDruidDataSource dataSource =
getImmutableDruidDataSource(segment);
+ final ImmutableDruidDataSource dataSource =
getImmutableDruidDataSource(TEST_SEGMENT);
final ObjectMapper objectMapper = new DefaultObjectMapper()
.setInjectableValues(new Std().addValue(PruneSpecsHolder.class,
PruneSpecsHolder.DEFAULT));
@@ -59,13 +71,10 @@ public class ImmutableDruidDataSourceTest
@Test
public void testEqualsMethodThrowsUnsupportedOperationException()
{
- final DataSegment segment1 = getTestSegment();
+ final ImmutableDruidDataSource dataSource1 =
getImmutableDruidDataSource(TEST_SEGMENT);
- final ImmutableDruidDataSource dataSource1 =
getImmutableDruidDataSource(segment1);
- final DataSegment segment2 = getTestSegment();
-
- final ImmutableDruidDataSource dataSource2 =
getImmutableDruidDataSource(segment2);
+ final ImmutableDruidDataSource dataSource2 =
getImmutableDruidDataSource(TEST_SEGMENT);
Assert.assertThrows(
"ImmutableDruidDataSource shouldn't be used as the key in containers",
@@ -74,37 +83,20 @@ public class ImmutableDruidDataSourceTest
);
}
- private ImmutableDruidDataSource getImmutableDruidDataSource(DataSegment
segment1)
+ private static ImmutableDruidDataSource
getImmutableDruidDataSource(DataSegment segment1)
{
return new ImmutableDruidDataSource(
- "test",
- ImmutableMap.of("prop1", "val1", "prop2", "val2"),
- ImmutableSortedMap.of(segment1.getId(), segment1)
- );
- }
-
- private DataSegment getTestSegment()
- {
- return new DataSegment(
"test",
- Intervals.of("2017/2018"),
- "version",
- null,
- ImmutableList.of("dim1", "dim2"),
- ImmutableList.of("met1", "met2"),
- null,
- null,
- 1,
- 100L,
- PruneSpecsHolder.DEFAULT
+ ImmutableMap.of("prop1", "val1", "prop2", "val2"),
+ ImmutableSortedMap.of(segment1.getId(), segment1)
);
}
+
@Test
public void testHashCodeMethodThrowsUnsupportedOperationException()
{
- final DataSegment segment = getTestSegment();
- final ImmutableDruidDataSource dataSource =
getImmutableDruidDataSource(segment);
+ final ImmutableDruidDataSource dataSource =
getImmutableDruidDataSource(TEST_SEGMENT);
Assert.assertThrows(
"ImmutableDruidDataSource shouldn't be used as the key in containers",
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 035cf13bb26..100436c9d65 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.druid.common.utils.IdUtilsTest;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
@@ -45,6 +46,7 @@ import
org.apache.druid.java.util.common.granularity.DurationGranularity;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.SelectorDimFilter;
@@ -557,6 +559,7 @@ public class DataSchemaTest extends
InitializedNullHandlingTest
@Test
public void testSerde() throws Exception
{
+ // deserialize, then serialize, then deserialize of DataSchema.
String jsonStr = "{"
+ "\"dataSource\":\"" +
StringEscapeUtils.escapeJson(IdUtilsTest.VALID_ID_CHARS) + "\","
+ "\"parser\":{"
@@ -582,30 +585,63 @@ public class DataSchemaTest extends
InitializedNullHandlingTest
DataSchema.class
);
- Assert.assertEquals(actual.getDataSource(), IdUtilsTest.VALID_ID_CHARS);
+ Assert.assertEquals(IdUtilsTest.VALID_ID_CHARS, actual.getDataSource());
Assert.assertEquals(
- actual.getParser().getParseSpec(),
new JSONParseSpec(
new TimestampSpec("xXx", null, null),
DimensionsSpec.builder().setDimensionExclusions(Arrays.asList("__time",
"metric1", "xXx", "col1")).build(),
null,
null,
null
- )
+ ),
+ actual.getParser().getParseSpec()
);
Assert.assertArrayEquals(
- actual.getAggregators(),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("metric1", "col1")
- }
+ },
+ actual.getAggregators()
);
Assert.assertEquals(
- actual.getGranularitySpec(),
new ArbitraryGranularitySpec(
new DurationGranularity(86400000, null),
ImmutableList.of(Intervals.of("2014/2015"))
- )
+ ),
+ actual.getGranularitySpec()
+ );
+ Assert.assertNull(actual.getProjections());
+ }
+
+ @Test
+ public void testSerdeWithProjections() throws Exception
+ {
+ // serialize, then deserialize of DataSchema with projections.
+ AggregateProjectionSpec projectionSpec = new AggregateProjectionSpec(
+ "ab_count_projection",
+ null,
+ Arrays.asList(
+ new StringDimensionSchema("a"),
+ new LongDimensionSchema("b")
+ ),
+ new AggregatorFactory[]{
+ new CountAggregatorFactory("count")
+ }
);
+ DataSchema original = DataSchema.builder()
+ .withDataSource("datasource")
+ .withTimestamp(new TimestampSpec(null,
null, null))
+ .withDimensions(DimensionsSpec.EMPTY)
+ .withAggregators(new
CountAggregatorFactory("rows"))
+
.withProjections(ImmutableList.of(projectionSpec))
+ .withGranularity(ARBITRARY_GRANULARITY)
+ .build();
+ DataSchema serdeResult =
jsonMapper.readValue(jsonMapper.writeValueAsString(original), DataSchema.class);
+
+ Assert.assertEquals("datasource", serdeResult.getDataSource());
+ Assert.assertArrayEquals(new AggregatorFactory[]{new
CountAggregatorFactory("rows")}, serdeResult.getAggregators());
+ Assert.assertEquals(ImmutableList.of(projectionSpec),
serdeResult.getProjections());
+ Assert.assertEquals(ImmutableList.of("ab_count_projection"),
serdeResult.getProjectionNames());
+ Assert.assertEquals(jsonMapper.writeValueAsString(original),
jsonMapper.writeValueAsString(serdeResult));
}
@Test
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
index 89792d85b82..99986d8d94a 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.segment.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.CoordinatorSegmentWatcherConfig;
@@ -62,7 +61,6 @@ import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
@@ -495,21 +493,13 @@ public class CoordinatorSegmentDataCacheConcurrencyTest
extends SegmentMetadataC
);
}
- private DataSegment newSegment(int partitionId)
+ private static DataSegment newSegment(int partitionId)
{
- return new DataSegment(
- DATASOURCE,
- Intervals.of("2012/2013"),
- "version1",
- null,
- ImmutableList.of(),
- ImmutableList.of(),
- new NumberedShardSpec(partitionId, 0),
- null,
- 1,
- 100L,
- PruneSpecsHolder.DEFAULT
- );
+ return DataSegment.builder(SegmentId.of(DATASOURCE,
Intervals.of("2012/2013"), "version1", partitionId))
+ .shardSpec(new NumberedShardSpec(partitionId, 0))
+ .binaryVersion(1)
+ .size(100L)
+ .build();
}
private QueryableIndex newQueryableIndex(int partitionId)
diff --git
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheTestBase.java
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheTestBase.java
index e8380b2f303..89bd878c8c6 100644
---
a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheTestBase.java
+++
b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheTestBase.java
@@ -50,6 +50,7 @@ import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.junit.Rule;
@@ -255,19 +256,14 @@ public abstract class SegmentMetadataCacheTestBase
extends InitializedNullHandli
.size(0)
.build();
- realtimeSegment1 = new DataSegment(
- DATASOURCE3,
- Intervals.of("2012/2013"),
- "version3",
- null,
- ImmutableList.of("dim1", "dim2"),
- ImmutableList.of("met1", "met2"),
- new NumberedShardSpec(2, 3),
- null,
- 1,
- 100L,
- DataSegment.PruneSpecsHolder.DEFAULT
- );
+ realtimeSegment1 = DataSegment.builder(SegmentId.of(DATASOURCE3,
Intervals.of("2012/2013"), "version3", null))
+ .shardSpec(new NumberedShardSpec(2, 3))
+ .dimensions(ImmutableList.of("dim1", "dim2"))
+ .metrics(ImmutableList.of("met1", "met2"))
+ .projections(ImmutableList.of("proj1",
"proj2"))
+ .binaryVersion(1)
+ .size(100L)
+ .build();
}
public void tearDown() throws Exception
@@ -302,18 +298,13 @@ public abstract class SegmentMetadataCacheTestBase
extends InitializedNullHandli
public DataSegment newSegment(String datasource, int partitionId)
{
- return new DataSegment(
- datasource,
- Intervals.of("2012/2013"),
- "version1",
- null,
- ImmutableList.of("dim1", "dim2"),
- ImmutableList.of("met1", "met2"),
- new NumberedShardSpec(partitionId, 0),
- null,
- 1,
- 100L,
- DataSegment.PruneSpecsHolder.DEFAULT
- );
+ return DataSegment.builder(SegmentId.of(datasource,
Intervals.of("2012/2013"), "version1", partitionId))
+ .shardSpec(new NumberedShardSpec(partitionId, 0))
+ .dimensions(ImmutableList.of("dim1", "dim2"))
+ .metrics(ImmutableList.of("met1", "met2"))
+ .projections(ImmutableList.of("proj1", "proj2"))
+ .binaryVersion(1)
+ .size(100L)
+ .build();
}
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index 4bf3a8dc22b..0eb46886e58 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -43,7 +43,6 @@ import
org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@@ -424,16 +423,14 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
Set<DataSegment> allSegments = new HashSet<>(segmentsToPublish);
int id = 0;
for (DataSegment segment : segmentsToPublish) {
- DataSegment upgradedSegment = new DataSegment(
- SegmentId.of(DATA_SOURCE, Intervals.ETERNITY, UPGRADED_VERSION,
id),
- segment.getLoadSpec(),
- segment.getDimensions(),
- segment.getMetrics(),
- new NumberedShardSpec(id, 0),
- null,
- segment.getBinaryVersion(),
- segment.getSize()
- );
+ DataSegment upgradedSegment = DataSegment.builder(segment)
+ .shardSpec(new
NumberedShardSpec(id, 0))
+ .dataSource(DATA_SOURCE)
+ .interval(Intervals.ETERNITY)
+ .version(UPGRADED_VERSION)
+ .lastCompactionState(null)
+ .build();
+
id++;
allSegments.add(upgradedSegment);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
index c3e04f04d6d..47293d8cdda 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java
@@ -47,6 +47,7 @@ import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
@@ -570,21 +571,19 @@ public class BatchDataSegmentAnnouncerTest
}
}
- private DataSegment makeSegment(int offset, boolean isTombstone)
+ private static DataSegment makeSegment(int offset, boolean isTombstone)
{
- DataSegment.Builder builder = DataSegment.builder();
- builder.dataSource("foo")
- .interval(
- new Interval(
- DateTimes.of("2013-01-01").plusDays(offset),
- DateTimes.of("2013-01-02").plusDays(offset)
- )
- )
- .version(DateTimes.nowUtc().toString())
- .dimensions(ImmutableList.of("dim1", "dim2"))
- .metrics(ImmutableList.of("met1", "met2"))
- .loadSpec(ImmutableMap.of("type", "local"))
- .size(0);
+ Interval interval = new Interval(
+ DateTimes.of("2013-01-01").plusDays(offset),
+ DateTimes.of("2013-01-02").plusDays(offset)
+ );
+ SegmentId segmentId = SegmentId.of("foo", interval,
DateTimes.nowUtc().toString(), null);
+ DataSegment.Builder builder = DataSegment.builder(segmentId)
+ .loadSpec(ImmutableMap.of("type",
"local"))
+
.dimensions(ImmutableList.of("dim1", "dim2"))
+ .metrics(ImmutableList.of("met1",
"met2"))
+
.projections(ImmutableList.of("proj1", "proj2"))
+ .size(0);
if (isTombstone) {
builder.loadSpec(Collections.singletonMap("type",
DataSegment.TOMBSTONE_LOADSPEC_TYPE));
}
@@ -592,7 +591,7 @@ public class BatchDataSegmentAnnouncerTest
return builder.build();
}
- private DataSegment makeSegment(int offset)
+ private static DataSegment makeSegment(int offset)
{
return makeSegment(offset, false);
}
diff --git a/services/src/main/java/org/apache/druid/cli/ExportMetadata.java
b/services/src/main/java/org/apache/druid/cli/ExportMetadata.java
index ece46fd11eb..f6c6e510f64 100644
--- a/services/src/main/java/org/apache/druid/cli/ExportMetadata.java
+++ b/services/src/main/java/org/apache/druid/cli/ExportMetadata.java
@@ -431,17 +431,7 @@ public class ExportMetadata extends GuiceRunnable
}
if (newLoadSpec != null) {
- segment = new DataSegment(
- segment.getDataSource(),
- segment.getInterval(),
- segment.getVersion(),
- newLoadSpec,
- segment.getDimensions(),
- segment.getMetrics(),
- segment.getShardSpec(),
- segment.getBinaryVersion(),
- segment.getSize()
- );
+ segment = DataSegment.builder(segment).loadSpec(newLoadSpec).build();
}
String serialized = JSON_MAPPER.writeValueAsString(segment);
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 834a7dd217d..de32eb8f259 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -156,6 +156,7 @@ public class SystemSchema extends AbstractSchema
.add("shard_spec", ColumnType.STRING)
.add("dimensions", ColumnType.STRING)
.add("metrics", ColumnType.STRING)
+ .add("projections", ColumnType.STRING)
.add("last_compaction_state", ColumnType.STRING)
.add("replication_factor", ColumnType.LONG)
.build();
@@ -173,6 +174,7 @@ public class SystemSchema extends AbstractSchema
SEGMENTS_SIGNATURE.indexOf("shard_spec"),
SEGMENTS_SIGNATURE.indexOf("dimensions"),
SEGMENTS_SIGNATURE.indexOf("metrics"),
+ SEGMENTS_SIGNATURE.indexOf("projections"),
SEGMENTS_SIGNATURE.indexOf("last_compaction_state")
}
);
@@ -372,6 +374,7 @@ public class SystemSchema extends AbstractSchema
segment.getShardSpec(),
segment.getDimensions(),
segment.getMetrics(),
+ segment.getProjections(),
segment.getLastCompactionState(),
// If the segment is unpublished, we won't have this
information yet.
// If the value is null, the load rules might have not
evaluated yet, and we don't know the replication factor.
@@ -411,6 +414,7 @@ public class SystemSchema extends AbstractSchema
segment.getShardSpec(),
segment.getDimensions(),
segment.getMetrics(),
+ segment.getProjections(),
null, // unpublished segments from realtime tasks will not be
compacted yet
REPLICATION_FACTOR_UNKNOWN // If the segment is unpublished,
we won't have this information yet.
};
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
index e74c1c50776..599ad73b200 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.sql.calcite.schema;
import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
@@ -262,7 +261,10 @@ public class BrokerSegmentMetadataCacheConcurrencyTest
extends BrokerSegmentMeta
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
- new PhysicalDatasourceMetadataFactory(new
MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), segmentManager),
+ new PhysicalDatasourceMetadataFactory(
+ new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
+ segmentManager
+ ),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
)
@@ -414,21 +416,13 @@ public class BrokerSegmentMetadataCacheConcurrencyTest
extends BrokerSegmentMeta
);
}
- private DataSegment newSegment(int partitionId)
+ private static DataSegment newSegment(int partitionId)
{
- return new DataSegment(
- DATASOURCE,
- Intervals.of("2012/2013"),
- "version1",
- null,
- ImmutableList.of(),
- ImmutableList.of(),
- new NumberedShardSpec(partitionId, 0),
- null,
- 1,
- 100L,
- DataSegment.PruneSpecsHolder.DEFAULT
- );
+ return DataSegment.builder(SegmentId.of(DATASOURCE,
Intervals.of("2012/2013"), "version1", null))
+ .shardSpec(new NumberedShardSpec(partitionId, 0))
+ .binaryVersion(1)
+ .size(100L)
+ .build();
}
private QueryableIndex newQueryableIndex(int partitionId)
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
index f9245a492e9..d0e5c012398 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java
@@ -897,19 +897,19 @@ public class BrokerSegmentMetadataCacheTest extends
BrokerSegmentMetadataCacheTe
markDataSourceLatch = new CountDownLatch(1);
refreshLatch = new CountDownLatch(1);
- final DataSegment someNewBrokerSegment = new DataSegment(
- "foo",
- Intervals.of("2012/2013"),
- "version1",
- null,
- ImmutableList.of("dim1", "dim2"),
- ImmutableList.of("met1", "met2"),
- new NumberedShardSpec(2, 3),
- null,
- 1,
- 100L,
- DataSegment.PruneSpecsHolder.DEFAULT
- );
+ final DataSegment someNewBrokerSegment = DataSegment.builder(SegmentId.of(
+ "foo",
+
Intervals.of("2012/2013"),
+ "version1",
+ null
+ ))
+ .shardSpec(new
NumberedShardSpec(2, 3))
+
.dimensions(ImmutableList.of("dim1", "dim2"))
+
.metrics(ImmutableList.of("met1", "met2"))
+
.projections(ImmutableList.of("proj1", "proj2"))
+ .binaryVersion(1)
+ .size(100L)
+ .build();
segmentDataSourceNames.add("foo");
joinableDataSourceNames.add("foo");
serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
@@ -964,19 +964,19 @@ public class BrokerSegmentMetadataCacheTest extends
BrokerSegmentMetadataCacheTe
markDataSourceLatch = new CountDownLatch(1);
refreshLatch = new CountDownLatch(1);
- final DataSegment someNewBrokerSegment = new DataSegment(
- "foo",
- Intervals.of("2012/2013"),
- "version1",
- null,
- ImmutableList.of("dim1", "dim2"),
- ImmutableList.of("met1", "met2"),
- new NumberedShardSpec(2, 3),
- null,
- 1,
- 100L,
- DataSegment.PruneSpecsHolder.DEFAULT
- );
+ final DataSegment someNewBrokerSegment = DataSegment.builder(SegmentId.of(
+ "foo",
+
Intervals.of("2012/2013"),
+ "version1",
+ null
+ ))
+ .shardSpec(new
NumberedShardSpec(2, 3))
+
.dimensions(ImmutableList.of("dim1", "dim2"))
+
.metrics(ImmutableList.of("met1", "met2"))
+
.projections(ImmutableList.of("proj1", "proj2"))
+ .binaryVersion(1)
+ .size(100L)
+ .build();
segmentDataSourceNames.add("foo");
serverView.addSegment(someNewBrokerSegment, ServerType.BROKER);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 34f08c64574..3d005c0426f 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import junitparams.converters.Nullable;
import org.apache.calcite.DataContext;
@@ -45,6 +44,8 @@ import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderClient;
@@ -69,7 +70,6 @@ import
org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import
org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.IndexBuilder;
@@ -109,6 +109,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentStatusInCluster;
import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.joda.time.DateTime;
@@ -155,6 +156,17 @@ public class SystemSchemaTest extends CalciteTestBase
TestDataBuilder.createRow(ImmutableMap.of("t", "2001-01-02", "m1",
"8.0", "dim3", ImmutableList.of("xyz")))
);
+ private static final ShardSpec SHARD_SPEC = new NumberedShardSpec(0, 1);
+ private static final IncrementalIndexSchema SCHEMA = new
IncrementalIndexSchema.Builder()
+ .withDimensionsSpec(new DimensionsSpec(ImmutableList.of(new
StringDimensionSchema("dim1"))))
+ .withMetrics(
+ new CountAggregatorFactory("cnt"),
+ new DoubleSumAggregatorFactory("m1", "m1"),
+ new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
+ )
+ .withRollup(false)
+ .build();
+
private SystemSchema schema;
private SpecificSegmentsQuerySegmentWalker walker;
private DruidLeaderClient client;
@@ -208,46 +220,27 @@ public class SystemSchemaTest extends CalciteTestBase
final QueryableIndex index1 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "1"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
- .schema(
- new
IncrementalIndexSchema.Builder()
- .withMetrics(
- new
CountAggregatorFactory("cnt"),
- new
DoubleSumAggregatorFactory("m1", "m1"),
- new
HyperUniquesAggregatorFactory("unique_dim1", "dim1")
- )
- .withRollup(false)
- .build()
- )
+ .schema(SCHEMA)
.rows(ROWS1)
.buildMMappedIndex();
final QueryableIndex index2 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "2"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
- .schema(
- new
IncrementalIndexSchema.Builder()
- .withMetrics(new
LongSumAggregatorFactory("m1", "m1"))
- .withRollup(false)
- .build()
- )
+ .schema(SCHEMA)
.rows(ROWS2)
.buildMMappedIndex();
final QueryableIndex index3 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "3"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
- .schema(
- new
IncrementalIndexSchema.Builder()
- .withMetrics(new
LongSumAggregatorFactory("m1", "m1"))
- .withRollup(false)
- .build()
- )
+ .schema(SCHEMA)
.rows(ROWS3)
.buildMMappedIndex();
walker = SpecificSegmentsQuerySegmentWalker.createWalker(conglomerate)
- .add(segment1, index1)
- .add(segment2, index2)
- .add(segment3, index3);
+ .add(segment1, index1)
+ .add(segment2, index2)
+ .add(segment3, index3);
BrokerSegmentMetadataCache cache = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
@@ -316,30 +309,31 @@ public class SystemSchemaTest extends CalciteTestBase
1,
83000L
);
- private final DataSegment publishedUncompactedSegment3 = new DataSegment(
- "wikipedia3",
- Intervals.of("2009/2010"),
- "version3",
- null,
- ImmutableList.of("dim1", "dim2"),
- ImmutableList.of("met1", "met2"),
- null,
- null,
- 1,
- 47000L
- );
-
- private final DataSegment segment1 = new DataSegment(
- "test1",
- Intervals.of("2010/2011"),
- "version1",
- null,
- ImmutableList.of("dim1", "dim2"),
- ImmutableList.of("met1", "met2"),
- null,
- 1,
- 100L
- );
+ private final DataSegment publishedUncompactedSegment3 =
DataSegment.builder(SegmentId.of(
+
"wikipedia3",
+
Intervals.of("2009/2010"),
+
"version3",
+ null
+ ))
+
.dimensions(ImmutableList.of("dim1", "dim2"))
+
.metrics(ImmutableList.of("met1", "met2"))
+
.projections(ImmutableList.of())
+
.binaryVersion(1)
+
.size(47000L)
+ .build();
+
+ private final DataSegment segment1 = DataSegment.builder(SegmentId.of(
+ "test1",
+
Intervals.of("2010/2011"),
+ "version1",
+ null
+ ))
+
.dimensions(ImmutableList.of("dim1", "dim2"))
+
.metrics(ImmutableList.of("met1", "met2"))
+
.projections(ImmutableList.of("proj1", "proj2"))
+ .binaryVersion(1)
+ .size(100L)
+ .build();
private final DataSegment segment2 = new DataSegment(
"test2",
Intervals.of("2011/2012"),
@@ -547,7 +541,7 @@ public class SystemSchemaTest extends CalciteTestBase
final RelDataType rowType = segmentsTable.getRowType(new
JavaTypeFactoryImpl());
final List<RelDataTypeField> fields = rowType.getFieldList();
- Assert.assertEquals(19, fields.size());
+ Assert.assertEquals(20, fields.size());
final SystemSchema.TasksTable tasksTable = (SystemSchema.TasksTable)
schema.getTableMap().get("tasks");
final RelDataType sysRowType = tasksTable.getRowType(new
JavaTypeFactoryImpl());
@@ -585,138 +579,104 @@ public class SystemSchemaTest extends CalciteTestBase
rows.sort((Object[] row1, Object[] row2) -> ((Comparable)
row1[0]).compareTo(row2[0]));
// total segments = 8
- // segments test1, test2 are published and available
- // segment test3 is served by historical but unpublished or unused
- // segments test4, test5 are not published but available (realtime
segments)
- // segment test2 is both published and served by a realtime server.
-
Assert.assertEquals(8, rows.size());
-
- verifyRow(
- rows.get(0),
- "test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1",
- 100L,
- 0L, //partition_num
- 1L, //num_replicas
- 3L, //numRows
- 1L, //is_published
- 1L, //is_available
- 0L, //is_realtime
- 1L, //is_overshadowed
- null, //is_compacted
- 2L // replication_factor
- );
-
- verifyRow(
- rows.get(1),
- "test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2",
- 100L,
- 0L, //partition_num
- 2L, //x§segment test2 is served by historical and realtime servers
- 3L, //numRows
- 1L, //is_published
- 1L, //is_available
- 0L, //is_realtime
- 0L, //is_overshadowed,
- null, //is_compacted
- 0L // replication_factor
- );
-
- //segment test3 is unpublished and has a NumberedShardSpec with
partitionNum = 2
- verifyRow(
- rows.get(2),
- "test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2",
- 100L,
- 2L, //partition_num
- 1L, //num_replicas
- 2L, //numRows
- 0L, //is_published
- 1L, //is_available
- 0L, //is_realtime
- 0L, //is_overshadowed
- null, //is_compacted
- -1L // replication_factor
- );
-
- verifyRow(
- rows.get(3),
- "test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4",
- 100L,
- 0L, //partition_num
- 1L, //num_replicas
- 0L, //numRows
- 0L, //is_published
- 1L, //is_available
- 1L, //is_realtime
- 0L, //is_overshadowed
- null, //is_compacted
- -1L // replication_factor
- );
-
- verifyRow(
- rows.get(4),
- "test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5",
- 100L,
- 0L, //partition_num
- 1L, //num_replicas
- 0L, //numRows
- 0L, //is_published
- 1L, //is_available
- 1L, //is_realtime
- 0L, //is_overshadowed
- null, //is_compacted
- -1L // replication_factor
- );
-
- // wikipedia segments are published and unavailable, num_replicas is 0
- // wikipedia segment 1 and 2 are compacted while 3 are not compacted
- verifyRow(
- rows.get(5),
-
"wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1",
- 53000L,
- 0L, //partition_num
- 0L, //num_replicas
- 0L, //numRows
- 1L, //is_published
- 0L, //is_available
- 0L, //is_realtime
- 1L, //is_overshadowed
- expectedCompactionState, //is_compacted
- 2L // replication_factor
- );
-
- verifyRow(
- rows.get(6),
-
"wikipedia2_2008-01-01T00:00:00.000Z_2009-01-01T00:00:00.000Z_version2",
- 83000L,
- 0L, //partition_num
- 0L, //num_replicas
- 0L, //numRows
- 1L, //is_published
- 0L, //is_available
- 0L, //is_realtime
- 0L, //is_overshadowed
- expectedCompactionState, //is_compacted
- 0L // replication_factor
- );
-
- verifyRow(
- rows.get(7),
-
"wikipedia3_2009-01-01T00:00:00.000Z_2010-01-01T00:00:00.000Z_version3",
- 47000L,
- 0L, //partition_num
- 0L, //num_replicas
- 0L, //numRows
- 1L, //is_published
- 0L, //is_available
- 0L, //is_realtime
- 0L, //is_overshadowed
- null, //is_compacted
- 2L // replication_factor
- );
-
// Verify value types.
verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE);
+
+ // segments test1, test2 are published and available.
+ Object[] segment1Expected = new Object[]{
+ // segment_id, datasource
+ "test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1",
"test1",
+ // start, end, size, version, partition_num, num_replicas, numRows
+ "2010-01-01T00:00:00.000Z", "2011-01-01T00:00:00.000Z", 100L,
"version1", 0L, 1L, 3L,
+ // is_active, is_published, is_available, is_realtime,
is_overshadowed, shard_spec
+ 0L, 1L, 1L, 0L, 1L, MAPPER.writeValueAsString(SHARD_SPEC),
+ // dimensions, metrics, projections, last_compaction_state,
replication_factor
+ "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", "[\"proj1\",\"proj2\"]",
null, 2L
+ };
+ Assert.assertArrayEquals(segment1Expected, rows.get(0));
+ Object[] segment2Expected = new Object[]{
+ // segment_id, datasource
+ "test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2",
"test2",
+ // start, end, size, version, partition_num, num_replicas, numRows
+ "2011-01-01T00:00:00.000Z", "2012-01-01T00:00:00.000Z", 100L,
"version2", 0L, 2L, 3L,
+ // is_active, is_published, is_available, is_realtime,
is_overshadowed, shard_spec
+ 1L, 1L, 1L, 0L, 0L, MAPPER.writeValueAsString(SHARD_SPEC),
+ // dimensions, metrics, projections, last_compaction_state,
replication_factor
+ "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null, null, 0L
+ };
+ Assert.assertArrayEquals(segment2Expected, rows.get(1));
+ //segment test3 is unpublished and has a NumberedShardSpec with
partitionNum = 2, is served by historical but unpublished or unused
+ Object[] segment3Expected = new Object[]{
+ // segment_id, datasource
+ "test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3_2",
"test3",
+ // start, end, size, version, partition_num, num_replicas, numRows
+ "2012-01-01T00:00:00.000Z", "2013-01-01T00:00:00.000Z", 100L,
"version3", 2L, 1L, 2L,
+ // is_active, is_published, is_available, is_realtime,
is_overshadowed, shard_spec
+ 0L, 0L, 1L, 0L, 0L, MAPPER.writeValueAsString(new NumberedShardSpec(2,
3)),
+ // dimensions, metrics, projections, last_compaction_state,
replication_factor
+ "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null, null, -1L
+ };
+ Assert.assertArrayEquals(segment3Expected, rows.get(2));
+ // segments test4, test5 are not published but available (realtime
segments)
+ Object[] segment4Expected = new Object[]{
+ // segment_id, datasource
+ "test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4",
"test4",
+ // start, end, size, version, partition_num, num_replicas, numRows
+ "2014-01-01T00:00:00.000Z", "2015-01-01T00:00:00.000Z", 100L,
"version4", 0L, 1L, 0L,
+ // is_active, is_published, is_available, is_realtime,
is_overshadowed, shard_spec
+ 1L, 0L, 1L, 1L, 0L, MAPPER.writeValueAsString(SHARD_SPEC),
+ // dimensions, metrics, projections, last_compaction_state,
replication_factor
+ "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null, null, -1L
+ };
+ Assert.assertArrayEquals(segment4Expected, rows.get(3));
+ Object[] segment5Expected = new Object[]{
+ // segment_id, datasource
+ "test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5",
"test5",
+ // start, end, size, version, partition_num, num_replicas, numRows
+ "2015-01-01T00:00:00.000Z", "2016-01-01T00:00:00.000Z", 100L,
"version5", 0L, 1L, 0L,
+ // is_active, is_published, is_available, is_realtime,
is_overshadowed, shard_spec
+ 1L, 0L, 1L, 1L, 0L, MAPPER.writeValueAsString(SHARD_SPEC),
+ // dimensions, metrics, projections, last_compaction_state,
replication_factor
+ "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null, null, -1L
+ };
+ Assert.assertArrayEquals(segment5Expected, rows.get(4));
+
+ // wikipedia segment 1 and segment 2 are published and unavailable and
compacted, num_replicas is 0
+ Object[] wikiSegment1Expected = new Object[]{
+ // segment_id, datasource
+
"wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1",
"wikipedia1",
+ // start, end, size, version, partition_num, num_replicas, numRows
+ "2007-01-01T00:00:00.000Z", "2008-01-01T00:00:00.000Z", 53_000L,
"version1", 0L, 0L, 0L,
+ // is_active, is_published, is_available, is_realtime,
is_overshadowed, shard_spec
+ 0L, 1L, 0L, 0L, 1L, MAPPER.writeValueAsString(SHARD_SPEC),
+ // dimensions, metrics, projections, last_compaction_state,
replication_factor
+ "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null,
MAPPER.writeValueAsString(expectedCompactionState), 2L
+ };
+ Assert.assertArrayEquals(wikiSegment1Expected, rows.get(5));
+ Object[] wikiSegment2Expected = new Object[]{
+ // segment_id, datasource
+
"wikipedia2_2008-01-01T00:00:00.000Z_2009-01-01T00:00:00.000Z_version2",
"wikipedia2",
+ // start, end, size, version, partition_num, num_replicas, numRows
+ "2008-01-01T00:00:00.000Z", "2009-01-01T00:00:00.000Z", 83_000L,
"version2", 0L, 0L, 0L,
+ // is_active, is_published, is_available, is_realtime,
is_overshadowed, shard_spec
+ 1L, 1L, 0L, 0L, 0L, MAPPER.writeValueAsString(SHARD_SPEC),
+ // dimensions, metrics, projections, last_compaction_state,
replication_factor
+ "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", null,
MAPPER.writeValueAsString(expectedCompactionState), 0L
+ };
+ Assert.assertArrayEquals(wikiSegment2Expected, rows.get(6));
+ // wikipedia segment 3 are not compacted, and is projection aware.
+ Object[] wikiSegment3Expected = new Object[]{
+ // segment_id, datasource
+
"wikipedia3_2009-01-01T00:00:00.000Z_2010-01-01T00:00:00.000Z_version3",
"wikipedia3",
+ // start, end, size, version, partition_num, num_replicas, numRows
+ "2009-01-01T00:00:00.000Z", "2010-01-01T00:00:00.000Z", 47_000L,
"version3", 0L, 0L, 0L,
+ // is_active, is_published, is_available, is_realtime,
is_overshadowed, shard_spec
+ 1L, 1L, 0L, 0L, 0L, MAPPER.writeValueAsString(SHARD_SPEC),
+ // dimensions, metrics, projections, last_compaction_state,
replication_factor
+ "[\"dim1\",\"dim2\"]", "[\"met1\",\"met2\"]", "[]", null, 2L
+ };
+ Assert.assertArrayEquals(wikiSegment3Expected, rows.get(7));
}
@Test
@@ -787,44 +747,6 @@ public class SystemSchemaTest extends CalciteTestBase
);
}
- private void verifyRow(
- Object[] row,
- String segmentId,
- long size,
- long partitionNum,
- long numReplicas,
- long numRows,
- long isPublished,
- long isAvailable,
- long isRealtime,
- long isOvershadowed,
- CompactionState compactionState,
- long replicationFactor
- ) throws Exception
- {
- Assert.assertEquals(segmentId, row[0].toString());
- SegmentId id =
Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0);
- Assert.assertEquals(id.getDataSource(), row[1]);
- Assert.assertEquals(id.getIntervalStart().toString(), row[2]);
- Assert.assertEquals(id.getIntervalEnd().toString(), row[3]);
- Assert.assertEquals(size, row[4]);
- Assert.assertEquals(id.getVersion(), row[5]);
- Assert.assertEquals(partitionNum, row[6]);
- Assert.assertEquals(numReplicas, row[7]);
- Assert.assertEquals(numRows, row[8]);
- Assert.assertEquals((((isPublished == 1) && (isOvershadowed == 0)) ||
(isRealtime == 1)) ? 1L : 0L, row[9]);
- Assert.assertEquals(isPublished, row[10]);
- Assert.assertEquals(isAvailable, row[11]);
- Assert.assertEquals(isRealtime, row[12]);
- Assert.assertEquals(isOvershadowed, row[13]);
- if (compactionState == null) {
- Assert.assertNull(row[17]);
- } else {
- Assert.assertEquals(MAPPER.writeValueAsString(compactionState), row[17]);
- }
- Assert.assertEquals(replicationFactor, row[18]);
- }
-
@Test
public void testServersTable() throws URISyntaxException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]