This is an automated email from the ASF dual-hosted git repository.

davidlim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f0ecdfe  Fix `is_realtime` column behavior in sys.segments table 
(#8154)
f0ecdfe is described below

commit f0ecdfee30853535e4bfbbfc4f769ccd12d0694a
Author: Surekha <[email protected]>
AuthorDate: Wed Jul 31 21:26:49 2019 -0700

    Fix `is_realtime` column behavior in sys.segments table (#8154)
    
    * Fix is_realtime flag
    
    * make variable final
    
    * minor changes
    
    * Modify is_realtime behavior based on review comment
    
    * Fix UT
---
 docs/content/querying/sql.md                       |  2 +-
 .../calcite/schema/AvailableSegmentMetadata.java   | 21 ++++--
 .../druid/sql/calcite/schema/DruidSchema.java      | 43 +++++++----
 .../druid/sql/calcite/schema/DruidSchemaTest.java  | 84 +++++++++++++++++++---
 .../sql/calcite/util/TestServerInventoryView.java  | 13 +++-
 5 files changed, 133 insertions(+), 30 deletions(-)

diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md
index 8dfb0e8..3678024 100644
--- a/docs/content/querying/sql.md
+++ b/docs/content/querying/sql.md
@@ -655,7 +655,7 @@ Note that a segment can be served by more than one stream 
ingestion tasks or His
 |num_rows|LONG|Number of rows in current segment, this value could be null if 
unkown to Broker at query time|
 |is_published|LONG|Boolean is represented as long type where 1 = true, 0 = 
false. 1 represents this segment has been published to the metadata store with 
`used=1`|
 |is_available|LONG|Boolean is represented as long type where 1 = true, 0 = 
false. 1 if this segment is currently being served by any process(Historical or 
realtime)|
-|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = 
false. 1 if this segment is being served on any type of realtime tasks|
+|is_realtime|LONG|Boolean is represented as long type where 1 = true, 0 = 
false. 1 if this segment is _only_ served by realtime tasks, and 0 if any 
historical process is serving this segment|
 |is_overshadowed|LONG|Boolean is represented as long type where 1 = true, 0 = 
false. 1 if this segment is published and is _fully_ overshadowed by some other 
published segments. Currently, is_overshadowed is always false for unpublished 
segments, although this may change in the future. You can filter for segments 
that "should be published" by filtering for `is_published = 1 AND 
is_overshadowed = 0`. Segments can briefly be both published and overshadowed 
if they were recently replaced, b [...]
 |payload|STRING|JSON-serialized data segment payload|
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java
index 1dc8e1e..4efff11 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/AvailableSegmentMetadata.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.sql.calcite.schema;
 
+import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.sql.calcite.table.RowSignature;
 import org.apache.druid.timeline.DataSegment;
 
@@ -34,7 +35,7 @@ public class AvailableSegmentMetadata
   public static Builder builder(
       DataSegment segment,
       long isRealtime,
-      Set<String> segmentServers,
+      Set<DruidServerMetadata> segmentServers,
       RowSignature rowSignature,
       long numRows
   )
@@ -58,7 +59,7 @@ public class AvailableSegmentMetadata
   // to make it easy to count number of segments which are realtime
   private final long isRealtime;
   // set of servers that contain the segment
-  private final Set<String> segmentServers;
+  private final Set<DruidServerMetadata> segmentServers;
   private final long numRows;
   @Nullable
   private final RowSignature rowSignature;
@@ -82,7 +83,7 @@ public class AvailableSegmentMetadata
     return segment;
   }
 
-  public Set<String> getReplicas()
+  public Set<DruidServerMetadata> getReplicas()
   {
     return segmentServers;
   }
@@ -106,9 +107,9 @@ public class AvailableSegmentMetadata
   public static class Builder
   {
     private final DataSegment segment;
-    private final long isRealtime;
 
-    private Set<String> segmentServers;
+    private long isRealtime;
+    private Set<DruidServerMetadata> segmentServers;
     @Nullable
     private RowSignature rowSignature;
     private long numRows;
@@ -116,7 +117,7 @@ public class AvailableSegmentMetadata
     private Builder(
         DataSegment segment,
         long isRealtime,
-        Set<String> servers,
+        Set<DruidServerMetadata> servers,
         @Nullable RowSignature rowSignature,
         long numRows
     )
@@ -140,12 +141,18 @@ public class AvailableSegmentMetadata
       return this;
     }
 
-    public Builder withReplicas(Set<String> servers)
+    public Builder withReplicas(Set<DruidServerMetadata> servers)
     {
       this.segmentServers = servers;
       return this;
     }
 
+    public Builder withRealtime(long isRealtime)
+    {
+      this.isRealtime = isRealtime;
+      return this;
+    }
+
     public AvailableSegmentMetadata build()
     {
       return new AvailableSegmentMetadata(this);
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 2116fbb..55a9a79 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -57,6 +57,7 @@ import 
org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.QueryLifecycleFactory;
 import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Escalator;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -73,6 +74,7 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
@@ -359,14 +361,12 @@ public class DruidSchema extends AbstractSchema
       final Map<SegmentId, AvailableSegmentMetadata> knownSegments = 
segmentMetadataInfo.get(segment.getDataSource());
       AvailableSegmentMetadata segmentMetadata = knownSegments != null ? 
knownSegments.get(segment.getId()) : null;
       if (segmentMetadata == null) {
-        // segmentReplicatable is used to determine if segments are served by 
realtime servers or not
-        final long isRealtime = server.segmentReplicatable() ? 0 : 1;
-
-        final Set<String> servers = ImmutableSet.of(server.getName());
+        // segmentReplicatable is used to determine if segments are served by 
historical or realtime servers
+        long isRealtime = server.segmentReplicatable() ? 0 : 1;
         segmentMetadata = AvailableSegmentMetadata.builder(
             segment,
             isRealtime,
-            servers,
+            ImmutableSet.of(server),
             null,
             DEFAULT_NUM_ROWS
         ).build();
@@ -380,14 +380,15 @@ public class DruidSchema extends AbstractSchema
           log.debug("Added new immutable segment[%s].", segment.getId());
         }
       } else {
-        final Set<String> segmentServers = segmentMetadata.getReplicas();
-        final ImmutableSet<String> servers = new ImmutableSet.Builder<String>()
+        final Set<DruidServerMetadata> segmentServers = 
segmentMetadata.getReplicas();
+        final ImmutableSet<DruidServerMetadata> servers = new 
ImmutableSet.Builder<DruidServerMetadata>()
             .addAll(segmentServers)
-            .add(server.getName())
+            .add(server)
             .build();
         final AvailableSegmentMetadata metadataWithNumReplicas = 
AvailableSegmentMetadata
             .from(segmentMetadata)
             .withReplicas(servers)
+            .withRealtime(recomputeIsRealtime(servers))
             .build();
         knownSegments.put(segment.getId(), metadataWithNumReplicas);
         if (server.segmentReplicatable()) {
@@ -431,19 +432,23 @@ public class DruidSchema extends AbstractSchema
     }
   }
 
-  private void removeServerSegment(final DruidServerMetadata server, final 
DataSegment segment)
+  @VisibleForTesting
+  void removeServerSegment(final DruidServerMetadata server, final DataSegment 
segment)
   {
     synchronized (lock) {
       log.debug("Segment[%s] is gone from server[%s]", segment.getId(), 
server.getName());
       final Map<SegmentId, AvailableSegmentMetadata> knownSegments = 
segmentMetadataInfo.get(segment.getDataSource());
       final AvailableSegmentMetadata segmentMetadata = 
knownSegments.get(segment.getId());
-      final Set<String> segmentServers = segmentMetadata.getReplicas();
-      final ImmutableSet<String> servers = FluentIterable.from(segmentServers)
-                                                         
.filter(Predicates.not(Predicates.equalTo(server.getName())))
-                                                         .toSet();
+      final Set<DruidServerMetadata> segmentServers = 
segmentMetadata.getReplicas();
+      final ImmutableSet<DruidServerMetadata> servers = FluentIterable
+          .from(segmentServers)
+          .filter(Predicates.not(Predicates.equalTo(server)))
+          .toSet();
+
       final AvailableSegmentMetadata metadataWithNumReplicas = 
AvailableSegmentMetadata
           .from(segmentMetadata)
           .withReplicas(servers)
+          .withRealtime(recomputeIsRealtime(servers))
           .build();
       knownSegments.put(segment.getId(), metadataWithNumReplicas);
       lock.notifyAll();
@@ -475,6 +480,18 @@ public class DruidSchema extends AbstractSchema
     return retVal;
   }
 
+  private long recomputeIsRealtime(ImmutableSet<DruidServerMetadata> servers)
+  {
+    final Optional<DruidServerMetadata> historicalServer = servers
+        .stream()
+        .filter(metadata -> metadata.getType().equals(ServerType.HISTORICAL))
+        .findAny();
+
+    // if there is any historical server in the replicas, isRealtime flag 
should be unset
+    final long isRealtime = historicalServer.isPresent() ? 0 : 1;
+    return isRealtime;
+  }
+
   /**
    * Attempt to refresh "segmentSignatures" for a set of segments for a 
particular dataSource. Returns the set of
    * segments actually refreshed, which may be a subset of the asked-for set.
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index 08fcb7e..1b2573e 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.security.NoopEscalator;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.table.DruidTable;
@@ -54,6 +55,7 @@ import org.apache.druid.sql.calcite.view.NoopViewManager;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -168,8 +170,20 @@ public class DruidSchemaTest extends CalciteTestBase
                    .build(),
         index2
     );
-
-    final TimelineServerView serverView = new 
TestServerInventoryView(walker.getSegments());
+    final DataSegment segment1 = new DataSegment(
+        "foo3",
+        Intervals.of("2012/2013"),
+        "version3",
+        null,
+        ImmutableList.of("dim1", "dim2"),
+        ImmutableList.of("met1", "met2"),
+        new NumberedShardSpec(2, 3),
+        1,
+        100L,
+        DataSegment.PruneLoadSpecHolder.DEFAULT
+    );
+    final List<DataSegment> realtimeSegments = ImmutableList.of(segment1);
+    final TimelineServerView serverView = new 
TestServerInventoryView(walker.getSegments(), realtimeSegments);
     druidServers = serverView.getDruidServers();
 
     schema = new DruidSchema(
@@ -253,14 +267,14 @@ public class DruidSchemaTest extends CalciteTestBase
    * is called more than once for same segment
    */
   @Test
-  public void testSegmentMetadataHolderNumRows()
+  public void testAvailableSegmentMetadataNumRows()
   {
     Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = 
schema.getSegmentMetadataSnapshot();
     final List<DataSegment> segments = segmentsMetadata.values()
                                                        .stream()
                                                        
.map(AvailableSegmentMetadata::getSegment)
                                                        
.collect(Collectors.toList());
-    Assert.assertEquals(3, segments.size());
+    Assert.assertEquals(4, segments.size());
     // find the only segment with datasource "foo2"
     final DataSegment existingSegment = segments.stream()
                                                 .filter(segment -> 
segment.getDataSource().equals("foo2"))
@@ -309,7 +323,7 @@ public class DruidSchemaTest extends CalciteTestBase
                                                        .stream()
                                                        
.map(AvailableSegmentMetadata::getSegment)
                                                        
.collect(Collectors.toList());
-    Assert.assertEquals(segments.size(), 3);
+    Assert.assertEquals(4, segments.size());
     // segments contains two segments with datasource "foo" and one with 
datasource "foo2"
     // let's remove the only segment with datasource "foo2"
     final DataSegment segmentToRemove = segments.stream()
@@ -321,7 +335,7 @@ public class DruidSchemaTest extends CalciteTestBase
 
     // The following line can cause NPE without segmentMetadata null check in 
DruidSchema#refreshSegmentsForDataSource
     
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
-    Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2);
+    Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size());
   }
 
   @Test
@@ -332,7 +346,7 @@ public class DruidSchemaTest extends CalciteTestBase
                                                        .stream()
                                                        
.map(AvailableSegmentMetadata::getSegment)
                                                        
.collect(Collectors.toList());
-    Assert.assertEquals(segments.size(), 3);
+    Assert.assertEquals(4, segments.size());
     // remove one of the segments with datasource "foo"
     final DataSegment segmentToRemove = segments.stream()
                                                 .filter(segment -> 
segment.getDataSource().equals("foo"))
@@ -343,7 +357,61 @@ public class DruidSchemaTest extends CalciteTestBase
 
     // The following line can cause NPE without segmentMetadata null check in 
DruidSchema#refreshSegmentsForDataSource
     
schema.refreshSegments(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()));
-    Assert.assertEquals(schema.getSegmentMetadataSnapshot().size(), 2);
+    Assert.assertEquals(3, schema.getSegmentMetadataSnapshot().size());
+  }
+
+  @Test
+  public void testAvailableSegmentMetadataIsRealtime()
+  {
+    Map<SegmentId, AvailableSegmentMetadata> segmentsMetadata = 
schema.getSegmentMetadataSnapshot();
+    final List<DataSegment> segments = segmentsMetadata.values()
+                                                       .stream()
+                                                       
.map(AvailableSegmentMetadata::getSegment)
+                                                       
.collect(Collectors.toList());
+    // find the only realtime segment with datasource "foo3"
+    final DataSegment existingSegment = segments.stream()
+                                                .filter(segment -> 
segment.getDataSource().equals("foo3"))
+                                                .findFirst()
+                                                .orElse(null);
+    Assert.assertNotNull(existingSegment);
+    final AvailableSegmentMetadata metadata = 
segmentsMetadata.get(existingSegment.getId());
+    Assert.assertEquals(1L, metadata.isRealtime());
+    // get the historical server
+    final ImmutableDruidServer historicalServer = druidServers.stream()
+                                                        .filter(s -> 
s.getType().equals(ServerType.HISTORICAL))
+                                                        .findAny()
+                                                        .orElse(null);
+
+    Assert.assertNotNull(historicalServer);
+    final DruidServerMetadata historicalServerMetadata = 
historicalServer.getMetadata();
+
+    // add existingSegment to historical
+    schema.addSegment(historicalServerMetadata, existingSegment);
+    segmentsMetadata = schema.getSegmentMetadataSnapshot();
+    // get the segment with datasource "foo3"
+    DataSegment currentSegment = segments.stream()
+                                         .filter(segment -> 
segment.getDataSource().equals("foo3"))
+                                         .findFirst()
+                                         .orElse(null);
+    Assert.assertNotNull(currentSegment);
+    AvailableSegmentMetadata currentMetadata = 
segmentsMetadata.get(currentSegment.getId());
+    Assert.assertEquals(0L, currentMetadata.isRealtime());
+
+    ImmutableDruidServer realtimeServer = druidServers.stream()
+                                                      .filter(s -> 
s.getType().equals(ServerType.REALTIME))
+                                                      .findAny()
+                                                      .orElse(null);
+    Assert.assertNotNull(realtimeServer);
+    // drop existingSegment from realtime task
+    schema.removeServerSegment(realtimeServer.getMetadata(), existingSegment);
+    segmentsMetadata = schema.getSegmentMetadataSnapshot();
+    currentSegment = segments.stream()
+                             .filter(segment -> 
segment.getDataSource().equals("foo3"))
+                             .findFirst()
+                             .orElse(null);
+    Assert.assertNotNull(currentSegment);
+    currentMetadata = segmentsMetadata.get(currentSegment.getId());
+    Assert.assertEquals(0L, currentMetadata.isRealtime());
   }
 
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
index 2dcc569..0f36273 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestServerInventoryView.java
@@ -93,7 +93,18 @@ public class TestServerInventoryView implements 
TimelineServerView
         ImmutableMap.of("src", dataSource),
         1
     );
-    return ImmutableList.of(server);
+    final ImmutableDruidDataSource dataSource2 = new ImmutableDruidDataSource(
+        "DUMMY2",
+        Collections.emptyMap(),
+        realtimeSegments
+    );
+    final ImmutableDruidServer realtimeServer = new ImmutableDruidServer(
+        DUMMY_SERVER_REALTIME,
+        0L,
+        ImmutableMap.of("src", dataSource2),
+        1
+    );
+    return ImmutableList.of(server, realtimeServer);
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to