This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3538abd  Make sure all fields in sys.segments are JSON-serialized  
(#10481)
3538abd is described below

commit 3538abd5d064d11a03cea6458782e5f4b52d3c45
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Wed Oct 14 13:49:46 2020 -0700

    Make sure all fields in sys.segments are JSON-serialized  (#10481)
    
    * fix JSON format
    
    * Change all columns in sys segments to be JSON
    
    * Change all columns in sys segments to be JSON
    
    * add tests
    
    * fix failing tests
    
    * fix failing tests
---
 docs/querying/sql.md                               |  8 +-
 .../results/auth_test_sys_schema_segments.json     |  6 +-
 .../druid/sql/calcite/schema/SystemSchema.java     | 98 +++++++++++++---------
 .../druid/sql/calcite/schema/SystemSchemaTest.java | 19 ++---
 website/.spelling                                  |  1 +
 5 files changed, 73 insertions(+), 59 deletions(-)

diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index 702070d..8613972 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -1083,10 +1083,10 @@ Segments table provides details on all Druid segments, 
whether they are publishe
 |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = 
false. 1 if this segment is currently being served by any process(Historical or 
realtime). See the [Architecture 
page](../design/architecture.md#segment-lifecycle) for more details.|
 |is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = 
false. 1 if this segment is _only_ served by realtime tasks, and 0 if any 
historical process is serving this segment.|
 |is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = 
false. 1 if this segment is published and is _fully_ overshadowed by some other 
published segments. Currently, is_overshadowed is always false for unpublished 
segments, although this may change in the future. You can filter for segments 
that "should be published" by filtering for `is_published = 1 AND 
is_overshadowed = 0`. Segments can briefly be both published and overshadowed 
if they were recently replaced, b [...]
-|shardSpec|STRING|The toString of specific `ShardSpec`|
-|dimensions|STRING|The dimensions of the segment|
-|metrics|STRING|The metrics of the segment|
-|last_compaction_state|STRING|The configurations of the compaction task which 
created this segment. May be null if segment was not created by compaction 
task.|
+|shard_spec|STRING|JSON-serialized form of the segment `ShardSpec`|
+|dimensions|STRING|JSON-serialized form of the segment dimensions|
+|metrics|STRING|JSON-serialized form of the segment metrics|
+|last_compaction_state|STRING|JSON-serialized form of the compaction task's 
config (compaction task which created this segment). May be null if segment was 
not created by compaction task.|
 
 For example to retrieve all segments for datasource "wikipedia", use the query:
 
diff --git 
a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
 
b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
index b09e9de..d064733 100644
--- 
a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
+++ 
b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
@@ -13,9 +13,9 @@
     "is_available": 1,
     "is_realtime": 0,
     "is_overshadowed": 0,
-    "shardSpec": "NoneShardSpec",
-    "dimensions": "[anonymous, area_code, city, continent_code, country_name, 
dma_code, geo, language, namespace, network, newpage, page, postal_code, 
region_lookup, robot, unpatrolled, user]",
-    "metrics": "[added, count, deleted, delta, delta_hist, unique_users, 
variation]",
+    "shard_spec": "{\"type\":\"none\"}",
+    "dimensions": 
"[\"anonymous\",\"area_code\",\"city\",\"continent_code\",\"country_name\",\"dma_code\",\"geo\",\"language\",\"namespace\",\"network\",\"newpage\",\"page\",\"postal_code\",\"region_lookup\",\"robot\",\"unpatrolled\",\"user\"]",
+    "metrics": 
"[\"added\",\"count\",\"deleted\",\"delta\",\"delta_hist\",\"unique_users\",\"variation\"]",
     "last_compaction_state": null
   }
 ]
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 16e5c0a..19c6b22 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.sql.calcite.schema;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -142,7 +143,7 @@ public class SystemSchema extends AbstractSchema
       .add("is_available", ValueType.LONG)
       .add("is_realtime", ValueType.LONG)
       .add("is_overshadowed", ValueType.LONG)
-      .add("shardSpec", ValueType.STRING)
+      .add("shard_spec", ValueType.STRING)
       .add("dimensions", ValueType.STRING)
       .add("metrics", ValueType.STRING)
       .add("last_compaction_state", ValueType.STRING)
@@ -213,7 +214,7 @@ public class SystemSchema extends AbstractSchema
   {
     Preconditions.checkNotNull(serverView, "serverView");
     this.tableMap = ImmutableMap.of(
-        SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, 
authorizerMapper),
+        SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView, 
jsonMapper, authorizerMapper),
         SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, 
serverInventoryView, authorizerMapper),
         SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, 
authorizerMapper),
         TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, 
authorizerMapper),
@@ -233,17 +234,20 @@ public class SystemSchema extends AbstractSchema
   static class SegmentsTable extends AbstractTable implements ScannableTable
   {
     private final DruidSchema druidSchema;
+    private final ObjectMapper jsonMapper;
     private final AuthorizerMapper authorizerMapper;
     private final MetadataSegmentView metadataView;
 
     public SegmentsTable(
         DruidSchema druidSchemna,
         MetadataSegmentView metadataView,
+        ObjectMapper jsonMapper,
         AuthorizerMapper authorizerMapper
     )
     {
       this.druidSchema = druidSchemna;
       this.metadataView = metadataView;
+      this.jsonMapper = jsonMapper;
       this.authorizerMapper = authorizerMapper;
     }
 
@@ -296,25 +300,30 @@ public class SystemSchema extends AbstractSchema
               isAvailable = partialSegmentData.isAvailable();
               isRealtime = partialSegmentData.isRealtime();
             }
-            return new Object[]{
-                segment.getId(),
-                segment.getDataSource(),
-                segment.getInterval().getStart().toString(),
-                segment.getInterval().getEnd().toString(),
-                segment.getSize(),
-                segment.getVersion(),
-                (long) segment.getShardSpec().getPartitionNum(),
-                numReplicas,
-                numRows,
-                IS_PUBLISHED_TRUE, //is_published is true for published 
segments
-                isAvailable,
-                isRealtime,
-                val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : 
IS_OVERSHADOWED_FALSE,
-                segment.getShardSpec(),
-                segment.getDimensions(),
-                segment.getMetrics(),
-                segment.getLastCompactionState()
-            };
+            try {
+              return new Object[]{
+                  segment.getId(),
+                  segment.getDataSource(),
+                  segment.getInterval().getStart().toString(),
+                  segment.getInterval().getEnd().toString(),
+                  segment.getSize(),
+                  segment.getVersion(),
+                  (long) segment.getShardSpec().getPartitionNum(),
+                  numReplicas,
+                  numRows,
+                  IS_PUBLISHED_TRUE, //is_published is true for published 
segments
+                  isAvailable,
+                  isRealtime,
+                  val.isOvershadowed() ? IS_OVERSHADOWED_TRUE : 
IS_OVERSHADOWED_FALSE,
+                  segment.getShardSpec() == null ? null : 
jsonMapper.writeValueAsString(segment.getShardSpec()),
+                  segment.getDimensions() == null ? null : 
jsonMapper.writeValueAsString(segment.getDimensions()),
+                  segment.getMetrics() == null ? null : 
jsonMapper.writeValueAsString(segment.getMetrics()),
+                  segment.getLastCompactionState() == null ? null : 
jsonMapper.writeValueAsString(segment.getLastCompactionState())
+              };
+            }
+            catch (JsonProcessingException e) {
+              throw new RuntimeException(e);
+            }
           });
 
       final FluentIterable<Object[]> availableSegments = FluentIterable
@@ -328,26 +337,33 @@ public class SystemSchema extends AbstractSchema
             }
             final PartialSegmentData partialSegmentData = 
partialSegmentDataMap.get(val.getKey());
             final long numReplicas = partialSegmentData == null ? 0L : 
partialSegmentData.getNumReplicas();
-            return new Object[]{
-                val.getKey(),
-                val.getKey().getDataSource(),
-                val.getKey().getInterval().getStart().toString(),
-                val.getKey().getInterval().getEnd().toString(),
-                val.getValue().getSegment().getSize(),
-                val.getKey().getVersion(),
-                (long) 
val.getValue().getSegment().getShardSpec().getPartitionNum(),
-                numReplicas,
-                val.getValue().getNumRows(),
-                IS_PUBLISHED_FALSE, // is_published is false for unpublished 
segments
-                // is_available is assumed to be always true for segments 
announced by historicals or realtime tasks
-                IS_AVAILABLE_TRUE,
-                val.getValue().isRealtime(),
-                IS_OVERSHADOWED_FALSE, // there is an assumption here that 
unpublished segments are never overshadowed
-                val.getValue().getSegment().getShardSpec(),
-                val.getValue().getSegment().getDimensions(),
-                val.getValue().getSegment().getMetrics(),
-                null // unpublished segments from realtime tasks will not be 
compacted yet
-            };
+            try {
+              return new Object[]{
+                  val.getKey(),
+                  val.getKey().getDataSource(),
+                  val.getKey().getInterval().getStart().toString(),
+                  val.getKey().getInterval().getEnd().toString(),
+                  val.getValue().getSegment().getSize(),
+                  val.getKey().getVersion(),
+                  (long) 
val.getValue().getSegment().getShardSpec().getPartitionNum(),
+                  numReplicas,
+                  val.getValue().getNumRows(),
+                  IS_PUBLISHED_FALSE,
+                  // is_published is false for unpublished segments
+                  // is_available is assumed to be always true for segments 
announced by historicals or realtime tasks
+                  IS_AVAILABLE_TRUE,
+                  val.getValue().isRealtime(),
+                  IS_OVERSHADOWED_FALSE,
+                  // there is an assumption here that unpublished segments are 
never overshadowed
+                  val.getValue().getSegment().getShardSpec() == null ? null : 
jsonMapper.writeValueAsString(val.getValue().getSegment().getShardSpec()),
+                  val.getValue().getSegment().getDimensions() == null ? null : 
jsonMapper.writeValueAsString(val.getValue().getSegment().getDimensions()),
+                  val.getValue().getSegment().getMetrics() == null ? null : 
jsonMapper.writeValueAsString(val.getValue().getSegment().getMetrics()),
+                  null // unpublished segments from realtime tasks will not be 
compacted yet
+              };
+            }
+            catch (JsonProcessingException e) {
+              throw new RuntimeException(e);
+            }
           });
 
       final Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index f943aaf..46aa3cc 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -95,7 +95,6 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.SegmentWithOvershadowedStatus;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
-import org.apache.druid.timeline.partition.ShardSpec;
 import org.easymock.EasyMock;
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
 import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -523,9 +522,9 @@ public class SystemSchemaTest extends CalciteTestBase
   }
 
   @Test
-  public void testSegmentsTable()
+  public void testSegmentsTable() throws Exception
   {
-    final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, 
metadataView, authMapper);
+    final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, 
metadataView, new ObjectMapper(), authMapper);
     final Set<SegmentWithOvershadowedStatus> publishedSegments = new 
HashSet<>(Arrays.asList(
         new SegmentWithOvershadowedStatus(publishedCompactedSegment1, true),
         new SegmentWithOvershadowedStatus(publishedCompactedSegment2, false),
@@ -706,7 +705,7 @@ public class SystemSchemaTest extends CalciteTestBase
       long isRealtime,
       long isOvershadowed,
       CompactionState compactionState
-  )
+  ) throws Exception
   {
     Assert.assertEquals(segmentId, row[0].toString());
     SegmentId id = 
Iterables.get(SegmentId.iterateAllPossibleParsings(segmentId), 0);
@@ -722,7 +721,11 @@ public class SystemSchemaTest extends CalciteTestBase
     Assert.assertEquals(isAvailable, row[10]);
     Assert.assertEquals(isRealtime, row[11]);
     Assert.assertEquals(isOvershadowed, row[12]);
-    Assert.assertEquals(compactionState, row[16]);
+    if (compactionState == null) {
+      Assert.assertNull(row[16]);
+    } else {
+      Assert.assertEquals(mapper.writeValueAsString(compactionState), row[16]);
+    }
   }
 
   @Test
@@ -1289,12 +1292,6 @@ public class SystemSchemaTest extends CalciteTestBase
           case STRING:
             if (signature.getColumnName(i).equals("segment_id")) {
               expectedClass = SegmentId.class;
-            } else if (signature.getColumnName(i).equals("shardSpec")) {
-              expectedClass = ShardSpec.class;
-            } else if 
(signature.getColumnName(i).equals("last_compaction_state")) {
-              expectedClass = CompactionState.class;
-            } else if (signature.getColumnName(i).equals("dimensions") || 
signature.getColumnName(i).equals("metrics")) {
-              expectedClass = List.class;
             } else {
               expectedClass = String.class;
             }
diff --git a/website/.spelling b/website/.spelling
index 343be4d..096b300 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1515,6 +1515,7 @@ queue_insertion_time
 runner_status
 segment_id
 server_type
+shard_spec
 sqlTimeZone
 supervisor_id
 sys


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to