This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3be4a97  Fix inconsistent segment size(#6448) (#6451)
3be4a97 is described below

commit 3be4a9715061197cee4e121d54b23ccfdd2cbf8c
Author: Surekha <[email protected]>
AuthorDate: Fri Oct 12 12:55:20 2018 -0700

    Fix inconsistent segment size(#6448) (#6451)
    
    * Fix inconsistent segment size(#6448)
    
    * Fix the segment size for published segments
    * Changes to get numReplicas
    * Make coordinator segments API truly streaming
    
    * Changes to store partial segment data
    
    * Simplify SegmentMetadataHolder
    * Store partial the columns from available segments
    
    * Address comments
---
 .../apache/druid/server/http/MetadataResource.java |   9 +-
 .../druid/sql/calcite/schema/DruidSchema.java      |  14 +-
 .../sql/calcite/schema/SegmentMetadataHolder.java  |  12 +-
 .../druid/sql/calcite/schema/SystemSchema.java     | 142 ++++++++++++++-------
 .../druid/sql/calcite/schema/SystemSchemaTest.java |  18 +--
 5 files changed, 129 insertions(+), 66 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java 
b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index 781842f..d4db118 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -55,7 +55,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  */
@@ -151,16 +151,15 @@ public class MetadataResource
   public Response getDatabaseSegments(@Context final HttpServletRequest req)
   {
     final Collection<ImmutableDruidDataSource> druidDataSources = 
metadataSegmentManager.getInventory();
-    final Set<DataSegment> metadataSegments = druidDataSources
+    final Stream<DataSegment> metadataSegments = druidDataSources
         .stream()
-        .flatMap(t -> t.getSegments().stream())
-        .collect(Collectors.toSet());
+        .flatMap(t -> t.getSegments().stream());
 
     Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> 
Collections.singletonList(
         
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
 
     final Iterable<DataSegment> authorizedSegments = 
AuthorizationUtils.filterAuthorizedResources(
-        req, metadataSegments, raGenerator, authorizerMapper);
+        req, metadataSegments::iterator, raGenerator, authorizerMapper);
 
     final StreamingOutput stream = outputStream -> {
       final JsonFactory jsonFactory = jsonMapper.getFactory();
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 5a64d08..ab43307 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -326,7 +326,13 @@ public class DruidSchema extends AbstractSchema
         // segmentReplicatable is used to determine if segments are served by 
realtime servers or not
         final long isRealtime = server.segmentReplicatable() ? 0 : 1;
         final long isPublished = server.getType() == ServerType.HISTORICAL ? 1 
: 0;
-        SegmentMetadataHolder holder = new 
SegmentMetadataHolder.Builder(isPublished, 1, isRealtime, 1).build();
+        final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
+            segment.getIdentifier(),
+            isPublished,
+            1,
+            isRealtime,
+            1
+        ).build();
         // Unknown segment.
         setSegmentSignature(segment, holder);
         segmentsNeedingRefresh.add(segment);
@@ -338,8 +344,9 @@ public class DruidSchema extends AbstractSchema
         }
       } else {
         if (knownSegments.containsKey(segment)) {
-          SegmentMetadataHolder holder = knownSegments.get(segment);
-          SegmentMetadataHolder holderWithNumReplicas = new 
SegmentMetadataHolder.Builder(
+          final SegmentMetadataHolder holder = knownSegments.get(segment);
+          final SegmentMetadataHolder holderWithNumReplicas = new 
SegmentMetadataHolder.Builder(
+              holder.getSegmentId(),
               holder.isPublished(),
               holder.isAvailable(),
               holder.isRealtime(),
@@ -452,6 +459,7 @@ public class DruidSchema extends AbstractSchema
             final Map<DataSegment, SegmentMetadataHolder> dataSourceSegments = 
segmentMetadataInfo.get(segment.getDataSource());
             SegmentMetadataHolder holder = dataSourceSegments.get(segment);
             SegmentMetadataHolder updatedHolder = new 
SegmentMetadataHolder.Builder(
+                holder.getSegmentId(),
                 holder.isPublished(),
                 holder.isAvailable(),
                 holder.isRealtime(),
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
index f3b13bb..377f502 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
@@ -34,7 +34,7 @@ public class SegmentMetadataHolder
   private final long isPublished;
   private final long isAvailable;
   private final long isRealtime;
-
+  private final String segmentId;
   private final long numReplicas;
   @Nullable
   private final RowSignature rowSignature;
@@ -49,6 +49,7 @@ public class SegmentMetadataHolder
     this.isRealtime = builder.isRealtime;
     this.numReplicas = builder.numReplicas;
     this.numRows = builder.numRows;
+    this.segmentId = builder.segmentId;
   }
 
   public long isPublished()
@@ -66,6 +67,11 @@ public class SegmentMetadataHolder
     return isRealtime;
   }
 
+  public String getSegmentId()
+  {
+    return segmentId;
+  }
+
   public long getNumReplicas()
   {
     return numReplicas;
@@ -85,9 +91,11 @@ public class SegmentMetadataHolder
 
   public static class Builder
   {
+    private final String segmentId;
     private final long isPublished;
     private final long isAvailable;
     private final long isRealtime;
+
     private long numReplicas;
     @Nullable
     private RowSignature rowSignature;
@@ -95,12 +103,14 @@ public class SegmentMetadataHolder
     private Long numRows;
 
     public Builder(
+        String segmentId,
         long isPublished,
         long isAvailable,
         long isRealtime,
         long numReplicas
     )
     {
+      this.segmentId = segmentId;
       this.isPublished = isPublished;
       this.isAvailable = isAvailable;
       this.isRealtime = isRealtime;
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index da34bbe..e24699d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -48,7 +48,6 @@ import org.apache.druid.client.indexing.IndexingService;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.segment.column.ValueType;
@@ -77,10 +76,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class SystemSchema extends AbstractSchema
 {
-  private static final Logger log = new Logger(SystemSchema.class);
 
   public static final String NAME = "sys";
   private static final String SEGMENTS_TABLE = "segments";
@@ -211,6 +210,13 @@ public class SystemSchema extends AbstractSchema
       final Iterator<Entry<DataSegment, SegmentMetadataHolder>> 
availableSegmentEntries = availableSegmentMetadata.entrySet()
                                                                                
                                   .iterator();
 
+      // in memory map to store segment data from available segments
+      final Map<String, PartialSegmentData> partialSegmentDataMap = 
availableSegmentMetadata.values().stream().collect(
+          Collectors.toMap(
+              SegmentMetadataHolder::getSegmentId,
+              h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(), 
h.getNumReplicas(), h.getNumRows())
+          ));
+
       //get published segments from coordinator
       final JsonParserIterator<DataSegment> metadataSegments = 
getMetadataSegments(
           druidLeaderClient,
@@ -218,19 +224,50 @@ public class SystemSchema extends AbstractSchema
           responseHandler
       );
 
-      Set<String> availableSegmentIds = new HashSet<>();
-      //auth check for available segments
-      final Iterator<Entry<DataSegment, SegmentMetadataHolder>> 
authorizedAvailableSegments = getAuthorizedAvailableSegments(
-          availableSegmentEntries,
-          root
-      );
+      final Set<String> segmentsAlreadySeen = new HashSet<>();
+
+      final FluentIterable<Object[]> publishedSegments = FluentIterable
+          .from(() -> getAuthorizedPublishedSegments(
+              metadataSegments,
+              root
+          ))
+          .transform(val -> {
+            try {
+              segmentsAlreadySeen.add(val.getIdentifier());
+              final PartialSegmentData partialSegmentData = 
partialSegmentDataMap.get(val.getIdentifier());
+              return new Object[]{
+                  val.getIdentifier(),
+                  val.getDataSource(),
+                  val.getInterval().getStart(),
+                  val.getInterval().getEnd(),
+                  val.getSize(),
+                  val.getVersion(),
+                  val.getShardSpec().getPartitionNum(),
+                  partialSegmentData == null ? 0L : 
partialSegmentData.getNumReplicas(),
+                  partialSegmentData == null ? 0L : 
partialSegmentData.getNumRows(),
+                  1L, //is_published is true for published segments
+                  partialSegmentData == null ? 1L : 
partialSegmentData.isAvailable(),
+                  partialSegmentData == null ? 0L : 
partialSegmentData.isRealtime(),
+                  jsonMapper.writeValueAsString(val)
+              };
+            }
+            catch (JsonProcessingException e) {
+              throw new RuntimeException(StringUtils.format(
+                  "Error getting segment payload for segment %s",
+                  val.getIdentifier()
+              ), e);
+            }
+          });
 
       final FluentIterable<Object[]> availableSegments = FluentIterable
-          .from(() -> authorizedAvailableSegments)
+          .from(() -> getAuthorizedAvailableSegments(
+              availableSegmentEntries,
+              root
+          ))
           .transform(val -> {
             try {
-              if (!availableSegmentIds.contains(val.getKey().getIdentifier())) 
{
-                availableSegmentIds.add(val.getKey().getIdentifier());
+              if (segmentsAlreadySeen.contains(val.getKey().getIdentifier())) {
+                return null;
               }
               return new Object[]{
                   val.getKey().getIdentifier(),
@@ -240,7 +277,8 @@ public class SystemSchema extends AbstractSchema
                   val.getKey().getSize(),
                   val.getKey().getVersion(),
                   val.getKey().getShardSpec().getPartitionNum(),
-                  val.getValue().getNumReplicas(),
+                  partialSegmentDataMap.get(val.getKey().getIdentifier()) == 
null ? 0L
+                    : 
partialSegmentDataMap.get(val.getKey().getIdentifier()).getNumReplicas(),
                   val.getValue().getNumRows(),
                   val.getValue().isPublished(),
                   val.getValue().isAvailable(),
@@ -256,44 +294,8 @@ public class SystemSchema extends AbstractSchema
             }
           });
 
-      //auth check for published segments
-      final CloseableIterator<DataSegment> authorizedPublishedSegments = 
getAuthorizedPublishedSegments(
-          metadataSegments,
-          root
-      );
-      final FluentIterable<Object[]> publishedSegments = FluentIterable
-          .from(() -> authorizedPublishedSegments)
-          .transform(val -> {
-            try {
-              if (availableSegmentIds.contains(val.getIdentifier())) {
-                return null;
-              }
-              return new Object[]{
-                  val.getIdentifier(),
-                  val.getDataSource(),
-                  val.getInterval().getStart(),
-                  val.getInterval().getEnd(),
-                  val.getSize(),
-                  val.getVersion(),
-                  val.getShardSpec().getPartitionNum(),
-                  0L,
-                  -1L,
-                  1L,
-                  0L,
-                  0L,
-                  jsonMapper.writeValueAsString(val)
-              };
-            }
-            catch (JsonProcessingException e) {
-              throw new RuntimeException(StringUtils.format(
-                  "Error getting segment payload for segment %s",
-                  val.getIdentifier()
-              ), e);
-            }
-          });
-
       final Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
-          Iterables.concat(availableSegments, publishedSegments));
+          Iterables.concat(publishedSegments, availableSegments));
 
       return Linq4j.asEnumerable(allSegments).where(t -> t != null);
 
@@ -332,6 +334,48 @@ public class SystemSchema extends AbstractSchema
 
       return wrap(authorizedSegments.iterator(), it);
     }
+
+    private static class PartialSegmentData
+    {
+      private final long isAvailable;
+      private final long isRealtime;
+      private final long numReplicas;
+      private final Long numRows;
+
+      public PartialSegmentData(
+          final long isAvailable,
+          final long isRealtime,
+          final long numReplicas,
+          final Long numRows
+      )
+
+      {
+        this.isAvailable = isAvailable;
+        this.isRealtime = isRealtime;
+        this.numReplicas = numReplicas;
+        this.numRows = numRows;
+      }
+
+      public long isAvailable()
+      {
+        return isAvailable;
+      }
+
+      public long isRealtime()
+      {
+        return isRealtime;
+      }
+
+      public long getNumReplicas()
+      {
+        return numReplicas;
+      }
+
+      public Long getNumRows()
+      {
+        return numRows;
+      }
+    }
   }
 
   // Note that coordinator must be up to get segments
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 9b90733..8e87ac0 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -462,19 +462,21 @@ public class SystemSchemaTest extends CalciteTestBase
     Enumerator<Object[]> enumerator = rows.enumerator();
 
     Assert.assertEquals(true, enumerator.moveNext());
+    Object[] row1 = enumerator.current();
+    //segment 6 is published and unavailable, num_replicas is 0
+    Assert.assertEquals(1L, row1[9]);
+    Assert.assertEquals(0L, row1[7]);
+
     Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row2 = enumerator.current();
-    //segment 2 is published and has 2 replicas
-    Assert.assertEquals(1L, row2[9]);
-    Assert.assertEquals(2L, row2[7]);
     Assert.assertEquals(true, enumerator.moveNext());
     Assert.assertEquals(true, enumerator.moveNext());
     Assert.assertEquals(true, enumerator.moveNext());
+
+    Object[] row5 = enumerator.current();
+    //segment 2 is published and has 2 replicas
+    Assert.assertEquals(1L, row5[9]);
+    Assert.assertEquals(2L, row5[7]);
     Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row6 = enumerator.current();
-    //segment 6 is published and unavailable, num_replicas is 0
-    Assert.assertEquals(1L, row6[9]);
-    Assert.assertEquals(0L, row6[7]);
     Assert.assertEquals(false, enumerator.moveNext());
 
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to