This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new e908fd6 Add check for nullable numRows (#6460)
e908fd6 is described below
commit e908fd6db72fc13c83634febf66c4a3dc42fc7a7
Author: Surekha <[email protected]>
AuthorDate: Sat Oct 13 15:08:42 2018 -0700
Add check for nullable numRows (#6460)
* Add check for nullable numRows
* Make numRows long instead of Long type
* Add check for numRows in unit test
* small refactoring
* Modify test
PR comment from
https://github.com/apache/incubator-druid/pull/6094#pullrequestreview-163937783
* Add a test for serverSegments table
* update tests
---
.../sql/calcite/schema/SegmentMetadataHolder.java | 11 +--
.../druid/sql/calcite/schema/SystemSchema.java | 23 +++--
.../druid/sql/calcite/schema/SystemSchemaTest.java | 107 ++++++++++++++++-----
3 files changed, 100 insertions(+), 41 deletions(-)
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
index 377f502..cb446e9 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SegmentMetadataHolder.java
@@ -36,10 +36,9 @@ public class SegmentMetadataHolder
private final long isRealtime;
private final String segmentId;
private final long numReplicas;
+ private final long numRows;
@Nullable
private final RowSignature rowSignature;
- @Nullable
- private final Long numRows;
private SegmentMetadataHolder(Builder builder)
{
@@ -77,8 +76,7 @@ public class SegmentMetadataHolder
return numReplicas;
}
- @Nullable
- public Long getNumRows()
+ public long getNumRows()
{
return numRows;
}
@@ -99,8 +97,7 @@ public class SegmentMetadataHolder
private long numReplicas;
@Nullable
private RowSignature rowSignature;
- @Nullable
- private Long numRows;
+ private long numRows;
public Builder(
String segmentId,
@@ -123,7 +120,7 @@ public class SegmentMetadataHolder
return this;
}
- public Builder withNumRows(Long numRows)
+ public Builder withNumRows(long numRows)
{
this.numRows = numRows;
return this;
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index e24699d..8e219a6 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -235,6 +235,13 @@ public class SystemSchema extends AbstractSchema
try {
segmentsAlreadySeen.add(val.getIdentifier());
final PartialSegmentData partialSegmentData =
partialSegmentDataMap.get(val.getIdentifier());
+ long numReplicas = 0L, numRows = 0L, isRealtime = 0L,
isAvailable = 1L;
+ if (partialSegmentData != null) {
+ numReplicas = partialSegmentData.getNumReplicas();
+ numRows = partialSegmentData.getNumRows();
+ isAvailable = partialSegmentData.isAvailable();
+ isRealtime = partialSegmentData.isRealtime();
+ }
return new Object[]{
val.getIdentifier(),
val.getDataSource(),
@@ -243,11 +250,11 @@ public class SystemSchema extends AbstractSchema
val.getSize(),
val.getVersion(),
val.getShardSpec().getPartitionNum(),
- partialSegmentData == null ? 0L :
partialSegmentData.getNumReplicas(),
- partialSegmentData == null ? 0L :
partialSegmentData.getNumRows(),
+ numReplicas,
+ numRows,
1L, //is_published is true for published segments
- partialSegmentData == null ? 1L :
partialSegmentData.isAvailable(),
- partialSegmentData == null ? 0L :
partialSegmentData.isRealtime(),
+ isAvailable,
+ isRealtime,
jsonMapper.writeValueAsString(val)
};
}
@@ -340,13 +347,13 @@ public class SystemSchema extends AbstractSchema
private final long isAvailable;
private final long isRealtime;
private final long numReplicas;
- private final Long numRows;
+ private final long numRows;
public PartialSegmentData(
final long isAvailable,
final long isRealtime,
final long numReplicas,
- final Long numRows
+ final long numRows
)
{
@@ -371,7 +378,7 @@ public class SystemSchema extends AbstractSchema
return numReplicas;
}
- public Long getNumRows()
+ public long getNumRows()
{
return numRows;
}
@@ -466,7 +473,7 @@ public class SystemSchema extends AbstractSchema
}
}
- private static class ServerSegmentsTable extends AbstractTable implements
ScannableTable
+ static class ServerSegmentsTable extends AbstractTable implements
ScannableTable
{
private final TimelineServerView serverView;
final AuthorizerMapper authorizerMapper;
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 8e87ac0..528d972 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -34,24 +34,19 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.discovery.DruidLeaderClient;
-import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
-import org.apache.druid.query.QueryRunnerTestHelper;
-import org.apache.druid.query.ReflectionQueryToolChestWarehouse;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@@ -64,7 +59,6 @@ import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFacto
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
-import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Authorizer;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -284,25 +278,6 @@ public class SystemSchemaTest extends CalciteTestBase
DataSegment.PruneLoadSpecHolder.DEFAULT
);
- private final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
- private final DirectDruidClient client1 = new DirectDruidClient(
- new ReflectionQueryToolChestWarehouse(),
- QueryRunnerTestHelper.NOOP_QUERYWATCHER,
- new DefaultObjectMapper(),
- httpClient,
- "http",
- "foo",
- new NoopServiceEmitter()
- );
- private final DirectDruidClient client2 = new DirectDruidClient(
- new ReflectionQueryToolChestWarehouse(),
- QueryRunnerTestHelper.NOOP_QUERYWATCHER,
- new DefaultObjectMapper(),
- httpClient,
- "http",
- "foo2",
- new NoopServiceEmitter()
- );
private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
new DruidServerMetadata("server1", "localhost:0000", null, 5L,
ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
1L,
@@ -314,7 +289,7 @@ public class SystemSchemaTest extends CalciteTestBase
new DruidServerMetadata("server2", "server2:1234", null, 5L,
ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
1L,
null,
- ImmutableMap.of("segment2", segment2, "segment4", segment4, "segment5",
segment5)
+ ImmutableMap.of("segment3", segment3, "segment4", segment4, "segment5",
segment5)
);
private final List<ImmutableDruidServer> immutableDruidServers =
ImmutableList.of(druidServer1, druidServer2);
@@ -466,6 +441,7 @@ public class SystemSchemaTest extends CalciteTestBase
//segment 6 is published and unavailable, num_replicas is 0
Assert.assertEquals(1L, row1[9]);
Assert.assertEquals(0L, row1[7]);
+ Assert.assertEquals(0L, row1[8]); //numRows = 0
Assert.assertEquals(true, enumerator.moveNext());
Assert.assertEquals(true, enumerator.moveNext());
@@ -476,6 +452,7 @@ public class SystemSchemaTest extends CalciteTestBase
//segment 2 is published and has 2 replicas
Assert.assertEquals(1L, row5[9]);
Assert.assertEquals(2L, row5[7]);
+ Assert.assertEquals(3L, row5[8]); //numRows = 3
Assert.assertEquals(true, enumerator.moveNext());
Assert.assertEquals(false, enumerator.moveNext());
@@ -529,6 +506,84 @@ public class SystemSchemaTest extends CalciteTestBase
}
@Test
+ public void testServerSegmentsTable()
+ {
+ SystemSchema.ServerSegmentsTable serverSegmentsTable =
EasyMock.createMockBuilder(SystemSchema.ServerSegmentsTable.class)
+
.withConstructor(serverView, authMapper)
+
.createMock();
+ EasyMock.replay(serverSegmentsTable);
+ EasyMock.expect(serverView.getDruidServers())
+ .andReturn(immutableDruidServers)
+ .once();
+ EasyMock.replay(serverView);
+ DataContext dataContext = new DataContext()
+ {
+ @Override
+ public SchemaPlus getRootSchema()
+ {
+ return null;
+ }
+
+ @Override
+ public JavaTypeFactory getTypeFactory()
+ {
+ return null;
+ }
+
+ @Override
+ public QueryProvider getQueryProvider()
+ {
+ return null;
+ }
+
+ @Override
+ public Object get(String name)
+ {
+ return CalciteTests.SUPER_USER_AUTH_RESULT;
+ }
+ };
+ Enumerable<Object[]> rows = serverSegmentsTable.scan(dataContext);
+ Assert.assertEquals(5, rows.count());
+
+ //server_segments table is the join of servers and segments table
+ // it will have 5 rows as follows
+ // localhost:0000 |
test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1(segment1)
+ // localhost:0000 |
test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2(segment2)
+ // server2:1234 |
test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3(segment3)
+ // server2:1234 |
test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4(segment4)
+ // server2:1234 |
test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5(segment5)
+
+ Enumerator<Object[]> enumerator = rows.enumerator();
+ Assert.assertEquals(true, enumerator.moveNext());
+
+ Object[] row1 = rows.first();
+ Assert.assertEquals("localhost:0000", row1[0]);
+
Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1",
row1[1]);
+
+ Assert.assertEquals(true, enumerator.moveNext());
+ Object[] row2 = enumerator.current();
+ Assert.assertEquals("localhost:0000", row2[0]);
+
Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2",
row2[1]);
+
+ Assert.assertEquals(true, enumerator.moveNext());
+ Object[] row3 = enumerator.current();
+ Assert.assertEquals("server2:1234", row3[0]);
+
Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3",
row3[1]);
+
+ Assert.assertEquals(true, enumerator.moveNext());
+ Object[] row4 = enumerator.current();
+ Assert.assertEquals("server2:1234", row4[0]);
+
Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4",
row4[1]);
+
+ Assert.assertEquals(true, enumerator.moveNext());
+ Object[] row5 = rows.last();
+ Assert.assertEquals("server2:1234", row5[0]);
+
Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5",
row5[1]);
+
+ Assert.assertEquals(false, enumerator.moveNext());
+ }
+
+ @Test
public void testTasksTable() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]