fjy closed pull request #6451: Fix inconsistent segment size(#6448)
URL: https://github.com/apache/incubator-druid/pull/6451
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java 
b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index 781842f4fee..d4db1186710 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -55,7 +55,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  */
@@ -151,16 +151,15 @@ public Response getDatabaseSegmentDataSource(
   public Response getDatabaseSegments(@Context final HttpServletRequest req)
   {
     final Collection<ImmutableDruidDataSource> druidDataSources = 
metadataSegmentManager.getInventory();
-    final Set<DataSegment> metadataSegments = druidDataSources
+    final Stream<DataSegment> metadataSegments = druidDataSources
         .stream()
-        .flatMap(t -> t.getSegments().stream())
-        .collect(Collectors.toSet());
+        .flatMap(t -> t.getSegments().stream());
 
     Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> 
Collections.singletonList(
         
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
 
     final Iterable<DataSegment> authorizedSegments = 
AuthorizationUtils.filterAuthorizedResources(
-        req, metadataSegments, raGenerator, authorizerMapper);
+        req, metadataSegments::iterator, raGenerator, authorizerMapper);
 
     final StreamingOutput stream = outputStream -> {
       final JsonFactory jsonFactory = jsonMapper.getFactory();
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 5a64d08700c..ab43307527c 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -326,7 +326,13 @@ private void addSegment(final DruidServerMetadata server, 
final DataSegment segm
         // segmentReplicatable is used to determine if segments are served by 
realtime servers or not
         final long isRealtime = server.segmentReplicatable() ? 0 : 1;
         final long isPublished = server.getType() == ServerType.HISTORICAL ? 1 
: 0;
-        SegmentMetadataHolder holder = new 
SegmentMetadataHolder.Builder(isPublished, 1, isRealtime, 1).build();
+        final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
+            segment.getIdentifier(),
+            isPublished,
+            1,
+            isRealtime,
+            1
+        ).build();
         // Unknown segment.
         setSegmentSignature(segment, holder);
         segmentsNeedingRefresh.add(segment);
@@ -338,8 +344,9 @@ private void addSegment(final DruidServerMetadata server, 
final DataSegment segm
         }
       } else {
         if (knownSegments.containsKey(segment)) {
-          SegmentMetadataHolder holder = knownSegments.get(segment);
-          SegmentMetadataHolder holderWithNumReplicas = new 
SegmentMetadataHolder.Builder(
+          final SegmentMetadataHolder holder = knownSegments.get(segment);
+          final SegmentMetadataHolder holderWithNumReplicas = new 
SegmentMetadataHolder.Builder(
+              holder.getSegmentId(),
               holder.isPublished(),
               holder.isAvailable(),
               holder.isRealtime(),
@@ -452,6 +459,7 @@ private void removeSegment(final DataSegment segment)
             final Map<DataSegment, SegmentMetadataHolder> dataSourceSegments = 
segmentMetadataInfo.get(segment.getDataSource());
             SegmentMetadataHolder holder = dataSourceSegments.get(segment);
             SegmentMetadataHolder updatedHolder = new 
SegmentMetadataHolder.Builder(
+                holder.getSegmentId(),
                 holder.isPublished(),
                 holder.isAvailable(),
                 holder.isRealtime(),
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
index f3b13bb8095..377f5020cdc 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
@@ -34,7 +34,7 @@
   private final long isPublished;
   private final long isAvailable;
   private final long isRealtime;
-
+  private final String segmentId;
   private final long numReplicas;
   @Nullable
   private final RowSignature rowSignature;
@@ -49,6 +49,7 @@ private SegmentMetadataHolder(Builder builder)
     this.isRealtime = builder.isRealtime;
     this.numReplicas = builder.numReplicas;
     this.numRows = builder.numRows;
+    this.segmentId = builder.segmentId;
   }
 
   public long isPublished()
@@ -66,6 +67,11 @@ public long isRealtime()
     return isRealtime;
   }
 
+  public String getSegmentId()
+  {
+    return segmentId;
+  }
+
   public long getNumReplicas()
   {
     return numReplicas;
@@ -85,9 +91,11 @@ public RowSignature getRowSignature()
 
   public static class Builder
   {
+    private final String segmentId;
     private final long isPublished;
     private final long isAvailable;
     private final long isRealtime;
+
     private long numReplicas;
     @Nullable
     private RowSignature rowSignature;
@@ -95,12 +103,14 @@ public RowSignature getRowSignature()
     private Long numRows;
 
     public Builder(
+        String segmentId,
         long isPublished,
         long isAvailable,
         long isRealtime,
         long numReplicas
     )
     {
+      this.segmentId = segmentId;
       this.isPublished = isPublished;
       this.isAvailable = isAvailable;
       this.isRealtime = isRealtime;
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index da34bbeeacd..e24699d61a2 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -48,7 +48,6 @@
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.segment.column.ValueType;
@@ -77,10 +76,10 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class SystemSchema extends AbstractSchema
 {
-  private static final Logger log = new Logger(SystemSchema.class);
 
   public static final String NAME = "sys";
   private static final String SEGMENTS_TABLE = "segments";
@@ -211,6 +210,13 @@ public TableType getJdbcTableType()
       final Iterator<Entry<DataSegment, SegmentMetadataHolder>> 
availableSegmentEntries = availableSegmentMetadata.entrySet()
                                                                                
                                   .iterator();
 
+      // in memory map to store segment data from available segments
+      final Map<String, PartialSegmentData> partialSegmentDataMap = 
availableSegmentMetadata.values().stream().collect(
+          Collectors.toMap(
+              SegmentMetadataHolder::getSegmentId,
+              h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(), 
h.getNumReplicas(), h.getNumRows())
+          ));
+
       //get published segments from coordinator
       final JsonParserIterator<DataSegment> metadataSegments = 
getMetadataSegments(
           druidLeaderClient,
@@ -218,19 +224,50 @@ public TableType getJdbcTableType()
           responseHandler
       );
 
-      Set<String> availableSegmentIds = new HashSet<>();
-      //auth check for available segments
-      final Iterator<Entry<DataSegment, SegmentMetadataHolder>> 
authorizedAvailableSegments = getAuthorizedAvailableSegments(
-          availableSegmentEntries,
-          root
-      );
+      final Set<String> segmentsAlreadySeen = new HashSet<>();
+
+      final FluentIterable<Object[]> publishedSegments = FluentIterable
+          .from(() -> getAuthorizedPublishedSegments(
+              metadataSegments,
+              root
+          ))
+          .transform(val -> {
+            try {
+              segmentsAlreadySeen.add(val.getIdentifier());
+              final PartialSegmentData partialSegmentData = 
partialSegmentDataMap.get(val.getIdentifier());
+              return new Object[]{
+                  val.getIdentifier(),
+                  val.getDataSource(),
+                  val.getInterval().getStart(),
+                  val.getInterval().getEnd(),
+                  val.getSize(),
+                  val.getVersion(),
+                  val.getShardSpec().getPartitionNum(),
+                  partialSegmentData == null ? 0L : 
partialSegmentData.getNumReplicas(),
+                  partialSegmentData == null ? 0L : 
partialSegmentData.getNumRows(),
+                  1L, //is_published is true for published segments
+                  partialSegmentData == null ? 1L : 
partialSegmentData.isAvailable(),
+                  partialSegmentData == null ? 0L : 
partialSegmentData.isRealtime(),
+                  jsonMapper.writeValueAsString(val)
+              };
+            }
+            catch (JsonProcessingException e) {
+              throw new RuntimeException(StringUtils.format(
+                  "Error getting segment payload for segment %s",
+                  val.getIdentifier()
+              ), e);
+            }
+          });
 
       final FluentIterable<Object[]> availableSegments = FluentIterable
-          .from(() -> authorizedAvailableSegments)
+          .from(() -> getAuthorizedAvailableSegments(
+              availableSegmentEntries,
+              root
+          ))
           .transform(val -> {
             try {
-              if (!availableSegmentIds.contains(val.getKey().getIdentifier())) 
{
-                availableSegmentIds.add(val.getKey().getIdentifier());
+              if (segmentsAlreadySeen.contains(val.getKey().getIdentifier())) {
+                return null;
               }
               return new Object[]{
                   val.getKey().getIdentifier(),
@@ -240,7 +277,8 @@ public TableType getJdbcTableType()
                   val.getKey().getSize(),
                   val.getKey().getVersion(),
                   val.getKey().getShardSpec().getPartitionNum(),
-                  val.getValue().getNumReplicas(),
+                  partialSegmentDataMap.get(val.getKey().getIdentifier()) == 
null ? 0L
+                    : 
partialSegmentDataMap.get(val.getKey().getIdentifier()).getNumReplicas(),
                   val.getValue().getNumRows(),
                   val.getValue().isPublished(),
                   val.getValue().isAvailable(),
@@ -256,44 +294,8 @@ public TableType getJdbcTableType()
             }
           });
 
-      //auth check for published segments
-      final CloseableIterator<DataSegment> authorizedPublishedSegments = 
getAuthorizedPublishedSegments(
-          metadataSegments,
-          root
-      );
-      final FluentIterable<Object[]> publishedSegments = FluentIterable
-          .from(() -> authorizedPublishedSegments)
-          .transform(val -> {
-            try {
-              if (availableSegmentIds.contains(val.getIdentifier())) {
-                return null;
-              }
-              return new Object[]{
-                  val.getIdentifier(),
-                  val.getDataSource(),
-                  val.getInterval().getStart(),
-                  val.getInterval().getEnd(),
-                  val.getSize(),
-                  val.getVersion(),
-                  val.getShardSpec().getPartitionNum(),
-                  0L,
-                  -1L,
-                  1L,
-                  0L,
-                  0L,
-                  jsonMapper.writeValueAsString(val)
-              };
-            }
-            catch (JsonProcessingException e) {
-              throw new RuntimeException(StringUtils.format(
-                  "Error getting segment payload for segment %s",
-                  val.getIdentifier()
-              ), e);
-            }
-          });
-
       final Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
-          Iterables.concat(availableSegments, publishedSegments));
+          Iterables.concat(publishedSegments, availableSegments));
 
       return Linq4j.asEnumerable(allSegments).where(t -> t != null);
 
@@ -332,6 +334,48 @@ public TableType getJdbcTableType()
 
       return wrap(authorizedSegments.iterator(), it);
     }
+
+    private static class PartialSegmentData
+    {
+      private final long isAvailable;
+      private final long isRealtime;
+      private final long numReplicas;
+      private final Long numRows;
+
+      public PartialSegmentData(
+          final long isAvailable,
+          final long isRealtime,
+          final long numReplicas,
+          final Long numRows
+      )
+
+      {
+        this.isAvailable = isAvailable;
+        this.isRealtime = isRealtime;
+        this.numReplicas = numReplicas;
+        this.numRows = numRows;
+      }
+
+      public long isAvailable()
+      {
+        return isAvailable;
+      }
+
+      public long isRealtime()
+      {
+        return isRealtime;
+      }
+
+      public long getNumReplicas()
+      {
+        return numReplicas;
+      }
+
+      public Long getNumRows()
+      {
+        return numRows;
+      }
+    }
   }
 
   // Note that coordinator must be up to get segments
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 9b9073344dc..8e87ac0ba27 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -462,19 +462,21 @@ public Object get(String name)
     Enumerator<Object[]> enumerator = rows.enumerator();
 
     Assert.assertEquals(true, enumerator.moveNext());
+    Object[] row1 = enumerator.current();
+    //segment 6 is published and unavailable, num_replicas is 0
+    Assert.assertEquals(1L, row1[9]);
+    Assert.assertEquals(0L, row1[7]);
+
     Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row2 = enumerator.current();
-    //segment 2 is published and has 2 replicas
-    Assert.assertEquals(1L, row2[9]);
-    Assert.assertEquals(2L, row2[7]);
     Assert.assertEquals(true, enumerator.moveNext());
     Assert.assertEquals(true, enumerator.moveNext());
     Assert.assertEquals(true, enumerator.moveNext());
+
+    Object[] row5 = enumerator.current();
+    //segment 2 is published and has 2 replicas
+    Assert.assertEquals(1L, row5[9]);
+    Assert.assertEquals(2L, row5[7]);
     Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row6 = enumerator.current();
-    //segment 6 is published and unavailable, num_replicas is 0
-    Assert.assertEquals(1L, row6[9]);
-    Assert.assertEquals(0L, row6[7]);
     Assert.assertEquals(false, enumerator.moveNext());
 
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to