This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3be4a97 Fix inconsistent segment size(#6448) (#6451)
3be4a97 is described below
commit 3be4a9715061197cee4e121d54b23ccfdd2cbf8c
Author: Surekha <[email protected]>
AuthorDate: Fri Oct 12 12:55:20 2018 -0700
Fix inconsistent segment size(#6448) (#6451)
* Fix inconsistent segment size(#6448)
* Fix the segment size for published segments
* Changes to get numReplicas
* Make coordinator segments API truly streaming
* Changes to store partial segment data
* Simplify SegmentMetadataHolder
* Store partial the columns from available segments
* Address comments
---
.../apache/druid/server/http/MetadataResource.java | 9 +-
.../druid/sql/calcite/schema/DruidSchema.java | 14 +-
.../sql/calcite/schema/SegmentMetadataHolder.java | 12 +-
.../druid/sql/calcite/schema/SystemSchema.java | 142 ++++++++++++++-------
.../druid/sql/calcite/schema/SystemSchemaTest.java | 18 +--
5 files changed, 129 insertions(+), 66 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index 781842f..d4db118 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -55,7 +55,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
*/
@@ -151,16 +151,15 @@ public class MetadataResource
public Response getDatabaseSegments(@Context final HttpServletRequest req)
{
final Collection<ImmutableDruidDataSource> druidDataSources =
metadataSegmentManager.getInventory();
- final Set<DataSegment> metadataSegments = druidDataSources
+ final Stream<DataSegment> metadataSegments = druidDataSources
.stream()
- .flatMap(t -> t.getSegments().stream())
- .collect(Collectors.toSet());
+ .flatMap(t -> t.getSegments().stream());
Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment ->
Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));
final Iterable<DataSegment> authorizedSegments =
AuthorizationUtils.filterAuthorizedResources(
- req, metadataSegments, raGenerator, authorizerMapper);
+ req, metadataSegments::iterator, raGenerator, authorizerMapper);
final StreamingOutput stream = outputStream -> {
final JsonFactory jsonFactory = jsonMapper.getFactory();
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index 5a64d08..ab43307 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -326,7 +326,13 @@ public class DruidSchema extends AbstractSchema
// segmentReplicatable is used to determine if segments are served by
realtime servers or not
final long isRealtime = server.segmentReplicatable() ? 0 : 1;
final long isPublished = server.getType() == ServerType.HISTORICAL ? 1
: 0;
- SegmentMetadataHolder holder = new
SegmentMetadataHolder.Builder(isPublished, 1, isRealtime, 1).build();
+ final SegmentMetadataHolder holder = new SegmentMetadataHolder.Builder(
+ segment.getIdentifier(),
+ isPublished,
+ 1,
+ isRealtime,
+ 1
+ ).build();
// Unknown segment.
setSegmentSignature(segment, holder);
segmentsNeedingRefresh.add(segment);
@@ -338,8 +344,9 @@ public class DruidSchema extends AbstractSchema
}
} else {
if (knownSegments.containsKey(segment)) {
- SegmentMetadataHolder holder = knownSegments.get(segment);
- SegmentMetadataHolder holderWithNumReplicas = new
SegmentMetadataHolder.Builder(
+ final SegmentMetadataHolder holder = knownSegments.get(segment);
+ final SegmentMetadataHolder holderWithNumReplicas = new
SegmentMetadataHolder.Builder(
+ holder.getSegmentId(),
holder.isPublished(),
holder.isAvailable(),
holder.isRealtime(),
@@ -452,6 +459,7 @@ public class DruidSchema extends AbstractSchema
final Map<DataSegment, SegmentMetadataHolder> dataSourceSegments =
segmentMetadataInfo.get(segment.getDataSource());
SegmentMetadataHolder holder = dataSourceSegments.get(segment);
SegmentMetadataHolder updatedHolder = new
SegmentMetadataHolder.Builder(
+ holder.getSegmentId(),
holder.isPublished(),
holder.isAvailable(),
holder.isRealtime(),
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
index f3b13bb..377f502 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
@@ -34,7 +34,7 @@ public class SegmentMetadataHolder
private final long isPublished;
private final long isAvailable;
private final long isRealtime;
-
+ private final String segmentId;
private final long numReplicas;
@Nullable
private final RowSignature rowSignature;
@@ -49,6 +49,7 @@ public class SegmentMetadataHolder
this.isRealtime = builder.isRealtime;
this.numReplicas = builder.numReplicas;
this.numRows = builder.numRows;
+ this.segmentId = builder.segmentId;
}
public long isPublished()
@@ -66,6 +67,11 @@ public class SegmentMetadataHolder
return isRealtime;
}
+ public String getSegmentId()
+ {
+ return segmentId;
+ }
+
public long getNumReplicas()
{
return numReplicas;
@@ -85,9 +91,11 @@ public class SegmentMetadataHolder
public static class Builder
{
+ private final String segmentId;
private final long isPublished;
private final long isAvailable;
private final long isRealtime;
+
private long numReplicas;
@Nullable
private RowSignature rowSignature;
@@ -95,12 +103,14 @@ public class SegmentMetadataHolder
private Long numRows;
public Builder(
+ String segmentId,
long isPublished,
long isAvailable,
long isRealtime,
long numReplicas
)
{
+ this.segmentId = segmentId;
this.isPublished = isPublished;
this.isAvailable = isAvailable;
this.isRealtime = isRealtime;
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index da34bbe..e24699d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -48,7 +48,6 @@ import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.segment.column.ValueType;
@@ -77,10 +76,10 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.stream.Collectors;
public class SystemSchema extends AbstractSchema
{
- private static final Logger log = new Logger(SystemSchema.class);
public static final String NAME = "sys";
private static final String SEGMENTS_TABLE = "segments";
@@ -211,6 +210,13 @@ public class SystemSchema extends AbstractSchema
final Iterator<Entry<DataSegment, SegmentMetadataHolder>>
availableSegmentEntries = availableSegmentMetadata.entrySet()
.iterator();
+ // in memory map to store segment data from available segments
+ final Map<String, PartialSegmentData> partialSegmentDataMap =
availableSegmentMetadata.values().stream().collect(
+ Collectors.toMap(
+ SegmentMetadataHolder::getSegmentId,
+ h -> new PartialSegmentData(h.isAvailable(), h.isRealtime(),
h.getNumReplicas(), h.getNumRows())
+ ));
+
//get published segments from coordinator
final JsonParserIterator<DataSegment> metadataSegments =
getMetadataSegments(
druidLeaderClient,
@@ -218,19 +224,50 @@ public class SystemSchema extends AbstractSchema
responseHandler
);
- Set<String> availableSegmentIds = new HashSet<>();
- //auth check for available segments
- final Iterator<Entry<DataSegment, SegmentMetadataHolder>>
authorizedAvailableSegments = getAuthorizedAvailableSegments(
- availableSegmentEntries,
- root
- );
+ final Set<String> segmentsAlreadySeen = new HashSet<>();
+
+ final FluentIterable<Object[]> publishedSegments = FluentIterable
+ .from(() -> getAuthorizedPublishedSegments(
+ metadataSegments,
+ root
+ ))
+ .transform(val -> {
+ try {
+ segmentsAlreadySeen.add(val.getIdentifier());
+ final PartialSegmentData partialSegmentData =
partialSegmentDataMap.get(val.getIdentifier());
+ return new Object[]{
+ val.getIdentifier(),
+ val.getDataSource(),
+ val.getInterval().getStart(),
+ val.getInterval().getEnd(),
+ val.getSize(),
+ val.getVersion(),
+ val.getShardSpec().getPartitionNum(),
+ partialSegmentData == null ? 0L :
partialSegmentData.getNumReplicas(),
+ partialSegmentData == null ? 0L :
partialSegmentData.getNumRows(),
+ 1L, //is_published is true for published segments
+ partialSegmentData == null ? 1L :
partialSegmentData.isAvailable(),
+ partialSegmentData == null ? 0L :
partialSegmentData.isRealtime(),
+ jsonMapper.writeValueAsString(val)
+ };
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(StringUtils.format(
+ "Error getting segment payload for segment %s",
+ val.getIdentifier()
+ ), e);
+ }
+ });
final FluentIterable<Object[]> availableSegments = FluentIterable
- .from(() -> authorizedAvailableSegments)
+ .from(() -> getAuthorizedAvailableSegments(
+ availableSegmentEntries,
+ root
+ ))
.transform(val -> {
try {
- if (!availableSegmentIds.contains(val.getKey().getIdentifier()))
{
- availableSegmentIds.add(val.getKey().getIdentifier());
+ if (segmentsAlreadySeen.contains(val.getKey().getIdentifier())) {
+ return null;
}
return new Object[]{
val.getKey().getIdentifier(),
@@ -240,7 +277,8 @@ public class SystemSchema extends AbstractSchema
val.getKey().getSize(),
val.getKey().getVersion(),
val.getKey().getShardSpec().getPartitionNum(),
- val.getValue().getNumReplicas(),
+ partialSegmentDataMap.get(val.getKey().getIdentifier()) ==
null ? 0L
+ :
partialSegmentDataMap.get(val.getKey().getIdentifier()).getNumReplicas(),
val.getValue().getNumRows(),
val.getValue().isPublished(),
val.getValue().isAvailable(),
@@ -256,44 +294,8 @@ public class SystemSchema extends AbstractSchema
}
});
- //auth check for published segments
- final CloseableIterator<DataSegment> authorizedPublishedSegments =
getAuthorizedPublishedSegments(
- metadataSegments,
- root
- );
- final FluentIterable<Object[]> publishedSegments = FluentIterable
- .from(() -> authorizedPublishedSegments)
- .transform(val -> {
- try {
- if (availableSegmentIds.contains(val.getIdentifier())) {
- return null;
- }
- return new Object[]{
- val.getIdentifier(),
- val.getDataSource(),
- val.getInterval().getStart(),
- val.getInterval().getEnd(),
- val.getSize(),
- val.getVersion(),
- val.getShardSpec().getPartitionNum(),
- 0L,
- -1L,
- 1L,
- 0L,
- 0L,
- jsonMapper.writeValueAsString(val)
- };
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(StringUtils.format(
- "Error getting segment payload for segment %s",
- val.getIdentifier()
- ), e);
- }
- });
-
final Iterable<Object[]> allSegments = Iterables.unmodifiableIterable(
- Iterables.concat(availableSegments, publishedSegments));
+ Iterables.concat(publishedSegments, availableSegments));
return Linq4j.asEnumerable(allSegments).where(t -> t != null);
@@ -332,6 +334,48 @@ public class SystemSchema extends AbstractSchema
return wrap(authorizedSegments.iterator(), it);
}
+
+ private static class PartialSegmentData
+ {
+ private final long isAvailable;
+ private final long isRealtime;
+ private final long numReplicas;
+ private final Long numRows;
+
+ public PartialSegmentData(
+ final long isAvailable,
+ final long isRealtime,
+ final long numReplicas,
+ final Long numRows
+ )
+
+ {
+ this.isAvailable = isAvailable;
+ this.isRealtime = isRealtime;
+ this.numReplicas = numReplicas;
+ this.numRows = numRows;
+ }
+
+ public long isAvailable()
+ {
+ return isAvailable;
+ }
+
+ public long isRealtime()
+ {
+ return isRealtime;
+ }
+
+ public long getNumReplicas()
+ {
+ return numReplicas;
+ }
+
+ public Long getNumRows()
+ {
+ return numRows;
+ }
+ }
}
// Note that coordinator must be up to get segments
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 9b90733..8e87ac0 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -462,19 +462,21 @@ public class SystemSchemaTest extends CalciteTestBase
Enumerator<Object[]> enumerator = rows.enumerator();
Assert.assertEquals(true, enumerator.moveNext());
+ Object[] row1 = enumerator.current();
+ //segment 6 is published and unavailable, num_replicas is 0
+ Assert.assertEquals(1L, row1[9]);
+ Assert.assertEquals(0L, row1[7]);
+
Assert.assertEquals(true, enumerator.moveNext());
- Object[] row2 = enumerator.current();
- //segment 2 is published and has 2 replicas
- Assert.assertEquals(1L, row2[9]);
- Assert.assertEquals(2L, row2[7]);
Assert.assertEquals(true, enumerator.moveNext());
Assert.assertEquals(true, enumerator.moveNext());
Assert.assertEquals(true, enumerator.moveNext());
+
+ Object[] row5 = enumerator.current();
+ //segment 2 is published and has 2 replicas
+ Assert.assertEquals(1L, row5[9]);
+ Assert.assertEquals(2L, row5[7]);
Assert.assertEquals(true, enumerator.moveNext());
- Object[] row6 = enumerator.current();
- //segment 6 is published and unavailable, num_replicas is 0
- Assert.assertEquals(1L, row6[9]);
- Assert.assertEquals(0L, row6[7]);
Assert.assertEquals(false, enumerator.moveNext());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]