This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3538abd Make sure all fields in sys.segments are JSON-serialized
(#10481)
3538abd is described below
commit 3538abd5d064d11a03cea6458782e5f4b52d3c45
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Wed Oct 14 13:49:46 2020 -0700
Make sure all fields in sys.segments are JSON-serialized (#10481)
* fix JSON format
* Change all columns in sys segments to be JSON
* Change all columns in sys segments to be JSON
* add tests
* fix failing tests
* fix failing tests
---
docs/querying/sql.md | 8 +-
.../results/auth_test_sys_schema_segments.json | 6 +-
.../druid/sql/calcite/schema/SystemSchema.java | 98 +++++++++++++---------
.../druid/sql/calcite/schema/SystemSchemaTest.java | 19 ++---
website/.spelling | 1 +
5 files changed, 73 insertions(+), 59 deletions(-)
diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index 702070d..8613972 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -1083,10 +1083,10 @@ Segments table provides details on all Druid segments,
whether they are publishe
|is_available|LONG|Boolean is represented as long type where 1 = true, 0 =
false. 1 if this segment is currently being served by any process(Historical or
realtime). See the [Architecture
page](../design/architecture.md#segment-lifecycle) for more details.|
|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 =
false. 1 if this segment is _only_ served by realtime tasks, and 0 if any
historical process is serving this segment.|
|is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 =
false. 1 if this segment is published and is _fully_ overshadowed by some other
published segments. Currently, is_overshadowed is always false for unpublished
segments, although this may change in the future. You can filter for segments
that "should be published" by filtering for `is_published = 1 AND
is_overshadowed = 0`. Segments can briefly be both published and overshadowed
if they were recently replaced, b [...]
-|shardSpec|STRING|The toString of specific `ShardSpec`|
-|dimensions|STRING|The dimensions of the segment|
-|metrics|STRING|The metrics of the segment|
-|last_compaction_state|STRING|The configurations of the compaction task which
created this segment. May be null if segment was not created by compaction
task.|
+|shard_spec|STRING|JSON-serialized form of the segment `ShardSpec`|
+|dimensions|STRING|JSON-serialized form of the segment dimensions|
+|metrics|STRING|JSON-serialized form of the segment metrics|
+|last_compaction_state|STRING|JSON-serialized form of the compaction task's
config (compaction task which created this segment). May be null if segment was
not created by compaction task.|
For example to retrieve all segments for datasource "wikipedia", use the query:
diff --git
a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
index b09e9de..d064733 100644
---
a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
+++
b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
@@ -13,9 +13,9 @@
"is_available": 1,
"is_realtime": 0,
"is_overshadowed": 0,
- "shardSpec": "NoneShardSpec",
- "dimensions": "[anonymous, area_code, city, continent_code, country_name,
dma_code, geo, language, namespace, network, newpage, page, postal_code,
region_lookup, robot, unpatrolled, user]",
- "metrics": "[added, count, deleted, delta, delta_hist, unique_users,
variation]",
+ "shard_spec": "{\"type\":\"none\"}",
+ "dimensions":
"[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]",
+ "metrics":
"[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]",
"last_compaction_state": null
}
]
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 16e5c0a..19c6b22 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -19,6 +19,7 @@
package org.apache.druid.sql.calcite.schema;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -142,7 +143,7 @@ public class SystemSchema extends AbstractSchema
.add("is_available", ValueType.LONG)
.add("is_realtime", ValueType.LONG)
.add("is_overshadowed", ValueType.LONG)
- .add("shardSpec", ValueType.STRING)
+ .add("shard_spec", ValueType.STRING)
.add("dimensions", ValueType.STRING)
.add("metrics", ValueType.STRING)
.add("last_compaction_state", ValueType.STRING)
@@ -213,7 +214,7 @@ public class SystemSchema extends AbstractSchema
{
Preconditions.checkNotNull(serverView, "serverView");
this.tableMap = ImmutableMap.of(
- SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView,
authorizerMapper),
+ SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView,
jsonMapper, authorizerMapper),
SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider,
serverInventoryView, authorizerMapper),
SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView,
authorizerMapper),
TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper,
authorizerMapper),
@@ -233,17 +234,20 @@ public class SystemSchema extends AbstractSchema
static class SegmentsTable extends AbstractTable implements ScannableTable
{
private final DruidSchema druidSchema;
+ private final ObjectMapper jsonMapper;
private final AuthorizerMapper authorizerMapper;
private final MetadataSegmentView metadataView;
public SegmentsTable(
DruidSchema druidSchemna,
MetadataSegmentView metadataView,
+ ObjectMapper jsonMapper,
AuthorizerMapper authorizerMapper
)
{
this.druidSchema = druidSchemna;
this.metadataView = metadataView;
+ this.jsonMapper = jsonMapper;
this.authorizerMapper = authorizerMapper;
}
@@ -296,25 +300,30 @@ public class SystemSchema extends AbstractSchema
isAvailable = partialSegmentData.isAvailable();
isRealtime = partialSegmentData.isRealtime();
}
- return new Object[]{
- segment.getId(),
- segment.getDataSource(),
- segment.getInterval().getStart().toString(),
- segment.getInterval().getEnd().toString(),
- segment.getSize(),
- segment.getVersion(),
- (long) segment.getShardSpec().getPartitionNum(),
- numReplicas,
- numRows,
- IS_PUBLISHED_TRUE, //is_published is true for published
segments
- isAvailable,
- isRealtime,
- val.isOvershadowed() ? IS_OVERSHADOWED_TRUE :
IS_OVERSHADOWED_FALSE,
- segment.getShardSpec(),
- segment.getDimensions(),
- segment.getMetrics(),
- segment.getLastCompactionState()
- };
+ try {
+ return new Object[]{
+ segment.getId(),
+ segment.getDataSource(),
+ segment.getInterval().getStart().toString(),
+ segment.getInterval().getEnd().toString(),
+ segment.getSize(),
+ segment.getVersion(),
+ (long) segment.getShardSpec().getPartitionNum(),
+ numReplicas,
+ numRows,
+ IS_PUBLISHED_TRUE, //is_published is true for published
segments
+ isAvailable,
+ isRealtime,
+ val.isOvershadowed() ? IS_OVERSHADOWED_TRUE :
IS_OVERSHADOWED_FALSE,
+ segment.getShardSpec() == null ? null :
jsonMapper.writeValueAsString(segment.getShardSpec()),
+ segment.getDimensions() == null ? null :
jsonMapper.writeValueAsString(segment.getDimensions()),
+ segment.getMetrics() == null ? null :
jsonMapper.writeValueAsString(segment.getMetrics()),
+ segment.getLastCompactionState() == null ? null :
jsonMapper.writeValueAsString(segment.getLastCompactionState())
+ };
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
});
final FluentIterable<Object[]> availableSegments = FluentIterable
@@ -328,26 +337,33 @@ public class SystemSchema extends AbstractSchema
}
final PartialSegmentData partialSegmentData =
partialSegmentDataMap.get(val.getKey());
final long numReplicas = partialSegmentData == null ? 0L :
partialSegmentData.getNumReplicas();
- return new Object[]{
- val.getKey(),
- val.getKey().getDataSource(),
- val.getKey().getInterval().getStart().toString(),
- val.getKey().getInterval().getEnd().toString(),
- val.getValue().getSegment().getSize(),
- val.getKey().getVersion(),
- (long)
val.getValue().getSegment().getShardSpec().getPartitionNum(),
- numReplicas,
- val.getValue().getNumRows(),
- IS_PUBLISHED_FALSE, // is_published is false for unpublished
segments
- // is_available is assumed to be always true for segments
announced by historicals or realtime tasks
- IS_AVAILABLE_TRUE,
- val.getValue().isRealtime(),
- IS_OVERSHADOWED_FALSE, // there is an assumption here that
unpublished segments are never overshadowed
- val.getValue().getSegment().getShardSpec(),
- val.getValue().getSegment().getDimensions(),
- val.getValue().getSegment().getMetrics(),
- null // unpublished segments from realtime tasks will not be
compacted yet
- };
+ try {
+ return new Object[]{
+ val.getKey(),
+ val.getKey().getDataSource(),
+ val.getKey().getInterval().getStart().toString(),
+ val.getKey().getInterval().getEnd().toString(),
+ val.getValue().getSegment().getSize(),
+ val.getKey().getVersion(),
+ (long)
val.getValue().getSegment().getShardSpec().getPartitionNum(),
+ numReplicas,
+ val.getValue().getNumRows(),
+ IS_PUBLISHED_FALSE,
+ // is_published is false for unpublished segments
+ // is_available is assumed to be always true for segments
announced by historicals or realtime tasks
+ IS_AVAILABLE_TRUE,
+ val.getValue().isRealtime(),
+ IS_OVERSHADOWED_FALSE,
+ // there is an assumption here that unpublished segments are
never overshadowed
+ val.getValue().getSegment().getShardSpec() == null ? null :
jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()),
+ val.getValue().getSegment().getDimensions() == null ? null :
jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()),
+ val.getValue().getSegment().getMetrics() == null ? null :
jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()),
+ null // unpublished segments from realtime tasks will not be
compacted yet
+ };
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
});
final Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index f943aaf..46aa3cc 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -95,7 +95,6 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
import org.apache.druid.timeline.partition.NumberedShardSpec;
-import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -523,9 +522,9 @@ public class SystemSchemaTest extends CalciteTestBase
}
@Test
- public void testSegmentsTable()
+ public void testSegmentsTable() throws Exception
{
- final SegmentsTable segmentsTable = new SegmentsTable(druidSchema,
metadataView, authMapper);
+ final SegmentsTable segmentsTable = new SegmentsTable(druidSchema,
metadataView, new ObjectMapper(), authMapper);
final Set<SegmentWithOvershadowedStatus> publishedSegments = new
HashSet<>(Arrays.asList(
new SegmentWithOvershadowedStatus(publishedCompactedSegment1, true),
new SegmentWithOvershadowedStatus(publishedCompactedSegment2, false),
@@ -706,7 +705,7 @@ public class SystemSchemaTest extends CalciteTestBase
long isRealtime,
long isOvershadowed,
CompactionState compactionState
- )
+ ) throws Exception
{
Assert.assertEquals(segmentId, row[0].toString());
SegmentId id =
Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0);
@@ -722,7 +721,11 @@ public class SystemSchemaTest extends CalciteTestBase
Assert.assertEquals(isAvailable, row[10]);
Assert.assertEquals(isRealtime, row[11]);
Assert.assertEquals(isOvershadowed, row[12]);
- Assert.assertEquals(compactionState, row[16]);
+ if (compactionState == null) {
+ Assert.assertNull(row[16]);
+ } else {
+ Assert.assertEquals(mapper.writeValueAsString(compactionState), row[16]);
+ }
}
@Test
@@ -1289,12 +1292,6 @@ public class SystemSchemaTest extends CalciteTestBase
case STRING:
if (signature.getColumnName(i).equals("segment_id")) {
expectedClass = SegmentId.class;
- } else if (signature.getColumnName(i).equals("shardSpec")) {
- expectedClass = ShardSpec.class;
- } else if
(signature.getColumnName(i).equals("last_compaction_state")) {
- expectedClass = CompactionState.class;
- } else if (signature.getColumnName(i).equals("dimensions") ||
signature.getColumnName(i).equals("metrics")) {
- expectedClass = List.class;
} else {
expectedClass = String.class;
}
diff --git a/website/.spelling b/website/.spelling
index 343be4d..096b300 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1515,6 +1515,7 @@ queue_insertion_time
runner_status
segment_id
server_type
+shard_spec
sqlTimeZone
supervisor_id
sys
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]