This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a0a2171d34 Fix the time segment pruner on TIMESTAMP data type (#12789)
a0a2171d34 is described below
commit a0a2171d349979461c34e58b1ec7f98f6e8e615b
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Apr 7 18:22:10 2024 -0700
Fix the time segment pruner on TIMESTAMP data type (#12789)
---
.../segmentpruner/SegmentPrunerFactory.java | 44 +-
.../routing/segmentpruner/TimeSegmentPruner.java | 136 ++---
.../routing/segmentpruner/SegmentPrunerTest.java | 621 +++++++++------------
.../org/apache/pinot/common/data/SchemaTest.java | 29 +-
.../apache/pinot/spi/data/DateTimeFieldSpec.java | 17 +-
5 files changed, 375 insertions(+), 472 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index 6135982e18..423eb527fa 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -37,7 +35,6 @@ import
org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +65,7 @@ public class SegmentPrunerFactory {
List<SegmentPruner> configuredSegmentPruners = new
ArrayList<>(segmentPrunerTypes.size());
for (String segmentPrunerType : segmentPrunerTypes) {
if
(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType))
{
- SegmentPruner partitionSegmentPruner =
getPartitionSegmentPruner(tableConfig, propertyStore);
+ SegmentPruner partitionSegmentPruner =
getPartitionSegmentPruner(tableConfig);
if (partitionSegmentPruner != null) {
configuredSegmentPruners.add(partitionSegmentPruner);
}
@@ -91,7 +88,7 @@ public class SegmentPrunerFactory {
if ((tableType == TableType.OFFLINE &&
LEGACY_PARTITION_AWARE_OFFLINE_ROUTING.equalsIgnoreCase(
routingTableBuilderName)) || (tableType == TableType.REALTIME
&&
LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName)))
{
- SegmentPruner partitionSegmentPruner =
getPartitionSegmentPruner(tableConfig, propertyStore);
+ SegmentPruner partitionSegmentPruner =
getPartitionSegmentPruner(tableConfig);
if (partitionSegmentPruner != null) {
segmentPruners.add(partitionSegmentPruner);
}
@@ -102,8 +99,7 @@ public class SegmentPrunerFactory {
}
@Nullable
- private static SegmentPruner getPartitionSegmentPruner(TableConfig
tableConfig,
- ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ private static SegmentPruner getPartitionSegmentPruner(TableConfig
tableConfig) {
String tableNameWithType = tableConfig.getTableName();
SegmentPartitionConfig segmentPartitionConfig =
tableConfig.getIndexingConfig().getSegmentPartitionConfig();
if (segmentPartitionConfig == null) {
@@ -137,26 +133,20 @@ public class SegmentPrunerFactory {
LOGGER.warn("Cannot enable time range pruning without time column for
table: {}", tableNameWithType);
return null;
}
- return createTimeSegmentPruner(tableConfig, propertyStore);
- }
-
- @VisibleForTesting
- static TimeSegmentPruner createTimeSegmentPruner(TableConfig tableConfig,
- ZkHelixPropertyStore<ZNRecord> propertyStore) {
- String tableNameWithType = tableConfig.getTableName();
- String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
- Preconditions.checkNotNull(timeColumn, "Time column must be configured in
table config for table: %s",
- tableNameWithType);
- Schema schema = ZKMetadataProvider.getTableSchema(propertyStore,
tableNameWithType);
- Preconditions.checkNotNull(schema, "Failed to find schema for table: %s",
tableNameWithType);
- DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
- Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in
schema for time column: %s of table: %s",
- timeColumn, tableNameWithType);
- DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec();
-
- LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with
DateTimeFormatSpec: {}",
- timeColumn, tableNameWithType, timeFormatSpec);
- return new TimeSegmentPruner(tableConfig, timeColumn, timeFormatSpec);
+ Schema schema = ZKMetadataProvider.getTableSchema(propertyStore,
tableConfig);
+ if (schema == null) {
+ LOGGER.warn("Cannot enable time range pruning without schema for table:
{}", tableNameWithType);
+ return null;
+ }
+ DateTimeFieldSpec timeFieldSpec = schema.getSpecForTimeColumn(timeColumn);
+ if (timeFieldSpec == null) {
+ LOGGER.warn("Cannot enable time range pruning without field spec for
table: {}, time column: {}",
+ tableNameWithType, timeColumn);
+ return null;
+ }
+ LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with
DateTimeFieldSpec: {}", timeColumn,
+ tableNameWithType, timeFieldSpec);
+ return new TimeSegmentPruner(tableConfig, timeFieldSpec);
}
private static List<SegmentPruner> sortSegmentPruners(List<SegmentPruner>
pruners) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
index a7ac4fce4b..c2e6b20cce 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -37,7 +38,9 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.Literal;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Query.Range;
@@ -64,10 +67,10 @@ public class TimeSegmentPruner implements SegmentPruner {
private volatile IntervalTree<String> _intervalTree;
private final Map<String, Interval> _intervalMap = new HashMap<>();
- public TimeSegmentPruner(TableConfig tableConfig, String timeColumn,
DateTimeFormatSpec timeFormatSpec) {
+ public TimeSegmentPruner(TableConfig tableConfig, DateTimeFieldSpec
timeFieldSpec) {
_tableNameWithType = tableConfig.getTableName();
- _timeColumn = timeColumn;
- _timeFormatSpec = timeFormatSpec;
+ _timeColumn = timeFieldSpec.getName();
+ _timeFormatSpec = timeFieldSpec.getFormatSpec();
}
@Override
@@ -206,97 +209,53 @@ public class TimeSegmentPruner implements SegmentPruner {
} else {
return getComplementSortedIntervals(childIntervals);
}
- case EQUALS: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long timeStamp =
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- return Collections.singletonList(new Interval(timeStamp, timeStamp));
- } else {
- return null;
+ case EQUALS:
+ if (isTimeColumn(operands.get(0))) {
+ long timestamp = toMillisSinceEpoch(operands.get(1));
+ return List.of(new Interval(timestamp, timestamp));
}
- }
- case IN: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
+ return null;
+ case IN:
+ if (isTimeColumn(operands.get(0))) {
int numOperands = operands.size();
List<Interval> intervals = new ArrayList<>(numOperands - 1);
for (int i = 1; i < numOperands; i++) {
- long timeStamp =
-
_timeFormatSpec.fromFormatToMillis(operands.get(i).getLiteral().getFieldValue().toString());
- intervals.add(new Interval(timeStamp, timeStamp));
+ long timestamp = toMillisSinceEpoch(operands.get(i));
+ intervals.add(new Interval(timestamp, timestamp));
}
return intervals;
- } else {
- return null;
}
- }
- case GREATER_THAN: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long timeStamp =
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- return Collections.singletonList(new Interval(timeStamp + 1,
MAX_END_TIME));
- } else {
- return null;
+ return null;
+ case GREATER_THAN:
+ if (isTimeColumn(operands.get(0))) {
+ return getInterval(toMillisSinceEpoch(operands.get(1)) + 1,
MAX_END_TIME);
}
- }
- case GREATER_THAN_OR_EQUAL: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long timeStamp =
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- return Collections.singletonList(new Interval(timeStamp,
MAX_END_TIME));
- } else {
- return null;
+ return null;
+ case GREATER_THAN_OR_EQUAL:
+ if (isTimeColumn(operands.get(0))) {
+ return getInterval(toMillisSinceEpoch(operands.get(1)),
MAX_END_TIME);
}
- }
- case LESS_THAN: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long timeStamp =
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- if (timeStamp > MIN_START_TIME) {
- return Collections.singletonList(new Interval(MIN_START_TIME,
timeStamp - 1));
- } else {
- return Collections.emptyList();
- }
- } else {
- return null;
+ return null;
+ case LESS_THAN:
+ if (isTimeColumn(operands.get(0))) {
+ return getInterval(MIN_START_TIME,
toMillisSinceEpoch(operands.get(1)) - 1);
}
- }
- case LESS_THAN_OR_EQUAL: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long timeStamp =
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- if (timeStamp >= MIN_START_TIME) {
- return Collections.singletonList(new Interval(MIN_START_TIME,
timeStamp));
- } else {
- return Collections.emptyList();
- }
- } else {
- return null;
+ return null;
+ case LESS_THAN_OR_EQUAL:
+ if (isTimeColumn(operands.get(0))) {
+ return getInterval(MIN_START_TIME,
toMillisSinceEpoch(operands.get(1)));
}
- }
- case BETWEEN: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long startTimestamp =
-
_timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- long endTimestamp =
-
_timeFormatSpec.fromFormatToMillis(operands.get(2).getLiteral().getFieldValue().toString());
- if (endTimestamp >= startTimestamp) {
- return Collections.singletonList(new Interval(startTimestamp,
endTimestamp));
- } else {
- return Collections.emptyList();
- }
- } else {
- return null;
+ return null;
+ case BETWEEN:
+ if (isTimeColumn(operands.get(0))) {
+ return getInterval(toMillisSinceEpoch(operands.get(1)),
toMillisSinceEpoch(operands.get(2)));
}
- }
- case RANGE: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
+ return null;
+ case RANGE:
+ if (isTimeColumn(operands.get(0))) {
return
parseInterval(operands.get(1).getLiteral().getFieldValue().toString());
}
return null;
- }
default:
return null;
}
@@ -408,6 +367,17 @@ public class TimeSegmentPruner implements SegmentPruner {
return res;
}
+ private boolean isTimeColumn(Expression expression) {
+ Identifier identifier = expression.getIdentifier();
+ return identifier != null && identifier.getName().equals(_timeColumn);
+ }
+
+ private long toMillisSinceEpoch(Expression expression) {
+ Literal literal = expression.getLiteral();
+ Preconditions.checkArgument(literal != null, "Literal is required for time
column filter, got: %s", expression);
+ return
_timeFormatSpec.fromFormatToMillis(literal.getFieldValue().toString());
+ }
+
/**
* Parse interval to millisecond as [min, max] with both sides included.
* E.g. '(* 16311]' is parsed as [0, 16311], '(1455 16311)' is parsed as
[1456, 16310]
@@ -432,10 +402,10 @@ public class TimeSegmentPruner implements SegmentPruner {
endTime--;
}
}
+ return getInterval(startTime, endTime);
+ }
- if (startTime > endTime) {
- return Collections.emptyList();
- }
- return Collections.singletonList(new Interval(startTime, endTime));
+ private static List<Interval> getInterval(long inclusiveStart, long
inclusiveEnd) {
+ return inclusiveStart <= inclusiveEnd ? List.of(new
Interval(inclusiveStart, inclusiveEnd)) : List.of();
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index feaad35169..5e48a981cc 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -18,8 +18,7 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;
-import java.util.Arrays;
-import java.util.Collections;
+import java.sql.Timestamp;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -50,11 +49,11 @@ import
org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfigProperties;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.mockito.Mockito;
@@ -78,29 +77,45 @@ public class SegmentPrunerTest extends ControllerTest {
private static final String SDF_PATTERN = "yyyyMMdd";
private static final String QUERY_1 = "SELECT * FROM testTable";
- private static final String QUERY_2 = "SELECT * FROM testTable where
memberId = 0";
- private static final String QUERY_3 = "SELECT * FROM testTable where
memberId IN (1, 2)";
- private static final String QUERY_4 = "SELECT * FROM testTable where
memberId = 0 AND memberName='xyz'";
-
- private static final String TIME_QUERY_1 = "SELECT * FROM testTable where
timeColumn = 40";
- private static final String TIME_QUERY_2 = "SELECT * FROM testTable where
timeColumn BETWEEN 20 AND 30";
- private static final String TIME_QUERY_3 = "SELECT * FROM testTable where 30
< timeColumn AND timeColumn <= 50";
- private static final String TIME_QUERY_4 = "SELECT * FROM testTable where
timeColumn < 15 OR timeColumn > 45";
+ private static final String QUERY_2 = "SELECT * FROM testTable WHERE
memberId = 0";
+ private static final String QUERY_3 = "SELECT * FROM testTable WHERE
memberId IN (1, 2)";
+ private static final String QUERY_4 = "SELECT * FROM testTable WHERE
memberId = 0 AND memberName = 'xyz'";
+
+ private static final String TIME_QUERY_1 = "SELECT * FROM testTable WHERE
timeColumn = 40";
+ private static final String TIME_QUERY_2 = "SELECT * FROM testTable WHERE
timeColumn BETWEEN 20 AND 30";
+ private static final String TIME_QUERY_3 = "SELECT * FROM testTable WHERE 30
< timeColumn AND timeColumn <= 50";
+ private static final String TIME_QUERY_4 = "SELECT * FROM testTable WHERE
timeColumn < 15 OR timeColumn > 45";
private static final String TIME_QUERY_5 =
- "SELECT * FROM testTable where timeColumn < 15 OR (60 < timeColumn AND
timeColumn < 70)";
- private static final String TIME_QUERY_6 = "SELECT * FROM testTable where
timeColumn < 0 AND timeColumn > 0";
+ "SELECT * FROM testTable WHERE timeColumn < 15 OR (60 < timeColumn AND
timeColumn < 70)";
+ private static final String TIME_QUERY_6 = "SELECT * FROM testTable WHERE
timeColumn NOT BETWEEN 20 AND 30";
+ private static final String TIME_QUERY_7 = "SELECT * FROM testTable WHERE
NOT timeColumn > 30";
+ private static final String TIME_QUERY_8 = "SELECT * FROM testTable WHERE
timeColumn < 0 AND timeColumn > 0";
- private static final String SDF_QUERY_1 = "SELECT * FROM testTable where
timeColumn = 20200131";
- private static final String SDF_QUERY_2 = "SELECT * FROM testTable where
timeColumn BETWEEN 20200101 AND 20200331";
+ private static final String SDF_QUERY_1 = "SELECT * FROM testTable WHERE
timeColumn = 20200131";
+ private static final String SDF_QUERY_2 = "SELECT * FROM testTable WHERE
timeColumn BETWEEN 20200101 AND 20200331";
private static final String SDF_QUERY_3 =
- "SELECT * FROM testTable where 20200430 < timeColumn AND timeColumn <
20200630";
+ "SELECT * FROM testTable WHERE 20200430 < timeColumn AND timeColumn <
20200630";
private static final String SDF_QUERY_4 =
- "SELECT * FROM testTable where timeColumn <= 20200101 OR timeColumn in
(20200201, 20200401)";
+ "SELECT * FROM testTable WHERE timeColumn <= 20200101 OR timeColumn IN
(20200201, 20200401)";
private static final String SDF_QUERY_5 =
- "SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND
timeColumn >= 20200530";
-
- private static final String SQL_TIME_QUERY_1 = "SELECT * FROM testTable
WHERE timeColumn NOT BETWEEN 20 AND 30";
- private static final String SQL_TIME_QUERY_2 = "SELECT * FROM testTable
WHERE NOT timeColumn > 30";
+ "SELECT * FROM testTable WHERE timeColumn IN (20200101, 20200102) AND
timeColumn >= 20200530";
+
+ // Timestamp can be passed as string or long
+ private static final String TIMESTAMP_QUERY_1 = "SELECT * FROM testTable
WHERE timeColumn = '2020-01-31 00:00:00'";
+ private static final String TIMESTAMP_QUERY_2 = String.format("SELECT * FROM
testTable WHERE timeColumn = %d",
+ Timestamp.valueOf("2020-01-31 00:00:00").getTime());
+ private static final String TIMESTAMP_QUERY_3 =
+ "SELECT * FROM testTable WHERE timeColumn BETWEEN '2020-01-01 00:00:00'
AND '2020-03-31 00:00:00'";
+ private static final String TIMESTAMP_QUERY_4 =
+ String.format("SELECT * FROM testTable WHERE timeColumn BETWEEN %d AND
%d",
+ Timestamp.valueOf("2020-01-01 00:00:00").getTime(),
Timestamp.valueOf("2020-03-31 00:00:00").getTime());
+ private static final String TIMESTAMP_QUERY_5 =
+ "SELECT * FROM testTable WHERE timeColumn <= '2020-01-01 00:00:00' OR
timeColumn IN ('2020-02-01 00:00:00', "
+ + "'2020-04-01 00:00:00')";
+ private static final String TIMESTAMP_QUERY_6 =
+ String.format("SELECT * FROM testTable WHERE timeColumn <= %d OR
timeColumn IN (%d, %d)",
+ Timestamp.valueOf("2020-01-01 00:00:00").getTime(),
Timestamp.valueOf("2020-02-01 00:00:00").getTime(),
+ Timestamp.valueOf("2020-04-01 00:00:00").getTime());
// this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use
KinesisConfig.STREAM_TYPE directly, we
// hardcode the value here to avoid pulling the entire pinot-kinesis module
as dependency.
@@ -127,6 +142,7 @@ public class SegmentPrunerTest extends ControllerTest {
@Test
public void testSegmentPrunerFactoryForPartitionPruner() {
TableConfig tableConfig = mock(TableConfig.class);
+ when(tableConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
IndexingConfig indexingConfig = mock(IndexingConfig.class);
when(tableConfig.getIndexingConfig()).thenReturn(indexingConfig);
@@ -141,8 +157,7 @@ public class SegmentPrunerTest extends ControllerTest {
assertEquals(segmentPruners.size(), 0);
// Segment partition config is missing
- when(routingConfig.getSegmentPrunerTypes()).thenReturn(
-
Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE));
+
when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig,
_propertyStore);
assertEquals(segmentPruners.size(), 0);
@@ -189,8 +204,7 @@ public class SegmentPrunerTest extends ControllerTest {
@Test
public void testSegmentPrunerFactoryForTimeRangePruner() {
TableConfig tableConfig = mock(TableConfig.class);
- when(tableConfig.getTableName()).thenReturn(RAW_TABLE_NAME);
- setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.HOURS);
+ when(tableConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
// Routing config is missing
List<SegmentPruner> segmentPruners =
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
@@ -203,8 +217,7 @@ public class SegmentPrunerTest extends ControllerTest {
assertEquals(segmentPruners.size(), 0);
// Validation config is missing
- when(routingConfig.getSegmentPrunerTypes()).thenReturn(
- Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE));
+
when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig,
_propertyStore);
assertEquals(segmentPruners.size(), 0);
@@ -214,41 +227,54 @@ public class SegmentPrunerTest extends ControllerTest {
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig,
_propertyStore);
assertEquals(segmentPruners.size(), 0);
- // Time range pruner should be returned
+ // Schema is missing
when(validationConfig.getTimeColumnName()).thenReturn(TIME_COLUMN);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig,
_propertyStore);
+ assertEquals(segmentPruners.size(), 0);
+
+ // Field spec is missing
+ Schema schema = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).build();
+ ZKMetadataProvider.setSchema(_propertyStore, schema);
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig,
_propertyStore);
+ assertEquals(segmentPruners.size(), 0);
+
+ // Time range pruner should be returned
+ schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+ .addDateTimeField(TIME_COLUMN, DataType.TIMESTAMP, "TIMESTAMP",
"1:MILLISECONDS").build();
+ ZKMetadataProvider.setSchema(_propertyStore, schema);
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig,
_propertyStore);
assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof TimeSegmentPruner);
}
@Test
- public void testEnablingEmptySegmentPruner() {
+ public void testSegmentPrunerFactoryForEmptySegmentPruner() {
TableConfig tableConfig = mock(TableConfig.class);
+ when(tableConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME);
IndexingConfig indexingConfig = mock(IndexingConfig.class);
+ when(tableConfig.getIndexingConfig()).thenReturn(indexingConfig);
RoutingConfig routingConfig = mock(RoutingConfig.class);
- StreamIngestionConfig streamIngestionConfig =
mock(StreamIngestionConfig.class);
+ when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
// When routingConfig is configured with EmptySegmentPruner,
EmptySegmentPruner should be returned.
- when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
- when(routingConfig.getSegmentPrunerTypes()).thenReturn(
- Collections.singletonList(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE));
- List<SegmentPruner> segmentPruners =
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+
when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE));
+ List<SegmentPruner> segmentPruners =
SegmentPrunerFactory.getSegmentPruners(tableConfig, propertyStore);
assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// When indexingConfig is configured with Kinesis streaming,
EmptySegmentPruner should be returned.
- when(indexingConfig.getStreamConfigs()).thenReturn(
- Collections.singletonMap(StreamConfigProperties.STREAM_TYPE,
KINESIS_STREAM_TYPE));
- segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig,
_propertyStore);
+
when(indexingConfig.getStreamConfigs()).thenReturn(Map.of(StreamConfigProperties.STREAM_TYPE,
KINESIS_STREAM_TYPE));
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig,
propertyStore);
assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// When streamIngestionConfig is configured with Kinesis streaming,
EmptySegmentPruner should be returned.
+ StreamIngestionConfig streamIngestionConfig =
mock(StreamIngestionConfig.class);
when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(
-
Collections.singletonList(Collections.singletonMap(StreamConfigProperties.STREAM_TYPE,
KINESIS_STREAM_TYPE)));
- when(indexingConfig.getStreamConfigs()).thenReturn(
- Collections.singletonMap(StreamConfigProperties.STREAM_TYPE,
KINESIS_STREAM_TYPE));
- segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig,
_propertyStore);
+ List.of(Map.of(StreamConfigProperties.STREAM_TYPE,
KINESIS_STREAM_TYPE)));
+
when(indexingConfig.getStreamConfigs()).thenReturn(Map.of(StreamConfigProperties.STREAM_TYPE,
KINESIS_STREAM_TYPE));
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig,
propertyStore);
assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
}
@@ -259,95 +285,76 @@ public class SegmentPrunerTest extends ControllerTest {
BrokerRequest brokerRequest2 =
CalciteSqlCompiler.compileToBrokerRequest(QUERY_2);
BrokerRequest brokerRequest3 =
CalciteSqlCompiler.compileToBrokerRequest(QUERY_3);
BrokerRequest brokerRequest4 =
CalciteSqlCompiler.compileToBrokerRequest(QUERY_4);
+
// NOTE: Ideal state and external view are not used in the current
implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
SinglePartitionColumnSegmentPruner singlePartitionColumnSegmentPruner =
new SinglePartitionColumnSegmentPruner(OFFLINE_TABLE_NAME,
PARTITION_COLUMN_1);
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
- _propertyStore);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(singlePartitionColumnSegmentPruner);
Set<String> onlineSegments = new HashSet<>();
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
Collections.emptySet()),
- Collections.emptySet());
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
Collections.emptySet()),
- Collections.emptySet());
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
Collections.emptySet()),
- Collections.emptySet());
+
+ Set<String> input = Set.of();
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4,
input), input);
// Segments without metadata (not updated yet) should not be pruned
String newSegment = "newSegment";
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
+ onlineSegments.add(newSegment);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ input = Set.of(newSegment);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4,
input), input);
// Segments without partition metadata should not be pruned
String segmentWithoutPartitionMetadata = "segmentWithoutPartitionMetadata";
- onlineSegments.add(segmentWithoutPartitionMetadata);
- SegmentZKMetadata segmentZKMetadataWithoutPartitionMetadata =
- new SegmentZKMetadata(segmentWithoutPartitionMetadata);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME,
- segmentZKMetadataWithoutPartitionMetadata);
+ new SegmentZKMetadata(segmentWithoutPartitionMetadata));
+ onlineSegments.add(segmentWithoutPartitionMetadata);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
- new
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
- Collections.singletonList(segmentWithoutPartitionMetadata));
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
- new
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
- Collections.singletonList(segmentWithoutPartitionMetadata));
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
- new
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
- Collections.singletonList(segmentWithoutPartitionMetadata));
+ input = Set.of(segmentWithoutPartitionMetadata);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4,
input), input);
// Test different partition functions and number of partitions
// 0 % 5 = 0; 1 % 5 = 1; 2 % 5 = 2
String segment0 = "segment0";
- onlineSegments.add(segment0);
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 5,
0);
+ onlineSegments.add(segment0);
// Murmur(0) % 4 = 0; Murmur(1) % 4 = 3; Murmur(2) % 4 = 0
String segment1 = "segment1";
- onlineSegments.add(segment1);
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment1, "Murmur", 4,
0);
+ onlineSegments.add(segment1);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment1)));
+ input = Set.of(segment0, segment1);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
input), Set.of(segment1));
// Update partition metadata without refreshing should have no effect
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 4,
1);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment1)));
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
input), Set.of(segment1));
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4,
input), input);
// Refresh the changed segment should update the segment pruner
segmentZkMetadataFetcher.refreshSegment(segment0);
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
input), Set.of(segment1));
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4,
input), Set.of(segment1));
// Multi-column partitioned segment.
MultiPartitionColumnsSegmentPruner multiPartitionColumnsSegmentPruner =
@@ -356,38 +363,25 @@ public class SegmentPrunerTest extends ControllerTest {
segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(multiPartitionColumnsSegmentPruner);
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1,
Collections.emptySet()),
- Collections.emptySet());
- assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2,
Collections.emptySet()),
- Collections.emptySet());
- assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3,
Collections.emptySet()),
- Collections.emptySet());
- assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4,
Collections.emptySet()),
- Collections.emptySet());
+
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1,
input), input);
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2,
input), Set.of(segment1));
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3,
input), input);
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4,
input), Set.of(segment1));
String segment2 = "segment2";
- onlineSegments.add(segment2);
Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap = new
HashMap<>();
- columnPartitionMetadataMap.put(PARTITION_COLUMN_1,
- new ColumnPartitionMetadata("Modulo", 4, Collections.singleton(0),
null));
- Map<String, String> partitionColumn2FunctionConfig = new HashMap<>();
- partitionColumn2FunctionConfig.put("columnValues", "xyz|abc");
- partitionColumn2FunctionConfig.put("columnValuesDelimiter", "|");
- columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new
ColumnPartitionMetadata(
- "BoundedColumnValue", 3, Collections.singleton(1),
partitionColumn2FunctionConfig));
+ columnPartitionMetadataMap.put(PARTITION_COLUMN_1, new
ColumnPartitionMetadata("Modulo", 4, Set.of(0), null));
+ columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new
ColumnPartitionMetadata("BoundedColumnValue", 3, Set.of(1),
+ Map.of("columnValues", "xyz|abc", "columnValuesDelimiter", "|")));
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment2,
columnPartitionMetadataMap);
+ onlineSegments.add(segment2);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEquals(
- multiPartitionColumnsSegmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- multiPartitionColumnsSegmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment1)));
- assertEquals(
- multiPartitionColumnsSegmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4,
- new HashSet<>(Arrays.asList(segment0, segment1, segment2))), new
HashSet<>(Arrays.asList(segment1, segment2)));
+ input = Set.of(segment0, segment1, segment2);
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1,
input), input);
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2,
input), Set.of(segment1, segment2));
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3,
input), Set.of(segment0, segment1));
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4,
input), Set.of(segment1, segment2));
}
@Test
@@ -399,143 +393,112 @@ public class SegmentPrunerTest extends ControllerTest {
BrokerRequest brokerRequest5 =
CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_4);
BrokerRequest brokerRequest6 =
CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_5);
BrokerRequest brokerRequest7 =
CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_6);
+ BrokerRequest brokerRequest8 =
CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_7);
+ BrokerRequest brokerRequest9 =
CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_8);
+
// NOTE: Ideal state and external view are not used in the current
implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
- TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME,
TableType.REALTIME);
- setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
- TimeSegmentPruner segmentPruner =
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig,
- _propertyStore);
- Set<String> onlineSegments = new HashSet<>();
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
- _propertyStore);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+ DateTimeFieldSpec timeFieldSpec = new DateTimeFieldSpec(TIME_COLUMN,
DataType.INT, "EPOCH|DAYS", "1:DAYS");
+ TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig,
timeFieldSpec);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(segmentPruner);
+ Set<String> onlineSegments = new HashSet<>();
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()),
Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()),
Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()),
Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest4, Collections.emptySet()),
Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest5, Collections.emptySet()),
Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest6, Collections.emptySet()),
Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest7, Collections.emptySet()),
Collections.emptySet());
-
- // Initialize with non-empty onlineSegments
+
+ Set<String> input = Set.of();
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest6, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest7, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest9, input), input);
+
// Segments without metadata (not updated yet) should not be pruned
- segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig,
_propertyStore);
- segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
- segmentZkMetadataFetcher.register(segmentPruner);
String newSegment = "newSegment";
onlineSegments.add(newSegment);
- segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1,
Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest2,
Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest3,
Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest4,
Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest5,
Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest6,
Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest7,
Collections.singleton(newSegment)),
- Collections.emptySet()); // query with invalid range will always have
empty filtered result
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
+ input = Set.of(newSegment);
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest6, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest7, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); //
Query with invalid range
// Segments without time range metadata should not be pruned
String segmentWithoutTimeRangeMetadata = "segmentWithoutTimeRangeMetadata";
- onlineSegments.add(segmentWithoutTimeRangeMetadata);
SegmentZKMetadata segmentZKMetadataWithoutTimeRangeMetadata =
new SegmentZKMetadata(segmentWithoutTimeRangeMetadata);
-
segmentZKMetadataWithoutTimeRangeMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
REALTIME_TABLE_NAME,
segmentZKMetadataWithoutTimeRangeMetadata);
+ onlineSegments.add(segmentWithoutTimeRangeMetadata);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEquals(
- segmentPruner.prune(brokerRequest1, new
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest2, new
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest3, new
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest4, new
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest5, new
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest6, new
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest7, new
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.emptySet());
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest6, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest7, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); //
Query with invalid range
// Test different time range
String segment0 = "segment0";
- onlineSegments.add(segment0);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60,
TimeUnit.DAYS);
-
+ onlineSegments.add(segment0);
String segment1 = "segment1";
- onlineSegments.add(segment1);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30,
TimeUnit.DAYS);
-
+ onlineSegments.add(segment1);
String segment2 = "segment2";
- onlineSegments.add(segment2);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65,
TimeUnit.DAYS);
-
+ onlineSegments.add(segment2);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(segmentPruner.prune(brokerRequest4, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest5, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest6, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest7, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.emptySet());
+ input = Set.of(segment0, segment1, segment2);
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0,
segment1));
+ assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0,
segment2));
+ assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0,
segment2));
+ assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0,
segment2));
+ assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0,
segment2));
+ assertEquals(segmentPruner.prune(brokerRequest8, input), Set.of(segment0,
segment1));
+ assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); //
Query with invalid range
// Update metadata without external view change or refreshing should have
no effect
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 20, 30,
TimeUnit.DAYS);
- assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(segmentPruner.prune(brokerRequest4, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest5, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest6, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest7, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.emptySet());
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0,
segment1));
+ assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0,
segment2));
+ assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0,
segment2));
+ assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0,
segment2));
+ assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0,
segment2));
+ assertEquals(segmentPruner.prune(brokerRequest8, input), Set.of(segment0,
segment1));
+ assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); //
Query with invalid range
// Refresh the changed segment should update the segment pruner
segmentZkMetadataFetcher.refreshSegment(segment2);
- assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest4, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest5, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest6, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest7, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.emptySet());
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); //
Query with invalid range
}
@Test
@@ -545,215 +508,175 @@ public class SegmentPrunerTest extends ControllerTest {
BrokerRequest brokerRequest3 =
CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_3);
BrokerRequest brokerRequest4 =
CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_4);
BrokerRequest brokerRequest5 =
CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_5);
+
// NOTE: Ideal state and external view are not used in the current
implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
- TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME,
TableType.REALTIME);
- setSchemaDateTimeFieldSpecSDF(RAW_TABLE_NAME, SDF_PATTERN);
-
- TimeSegmentPruner segmentPruner =
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
- _propertyStore);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+ DateTimeFieldSpec timeFieldSpec =
+ new DateTimeFieldSpec(TIME_COLUMN, DataType.STRING,
"SIMPLE_DATE_FORMAT|" + SDF_PATTERN, "1:DAYS");
+ TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig,
timeFieldSpec);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(segmentPruner);
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
RAW_TABLE_NAME);
- DateTimeFormatSpec dateTimeFormatSpec =
schema.getSpecForTimeColumn(TIME_COLUMN).getFormatSpec();
-
+ DateTimeFormatSpec timeFormatSpec = timeFieldSpec.getFormatSpec();
Set<String> onlineSegments = new HashSet<>();
String segment0 = "segment0";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0,
timeFormatSpec.fromFormatToMillis("20200101"),
+ timeFormatSpec.fromFormatToMillis("20200228"), TimeUnit.MILLISECONDS);
onlineSegments.add(segment0);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0,
dateTimeFormatSpec.fromFormatToMillis("20200101"),
- dateTimeFormatSpec.fromFormatToMillis("20200228"),
TimeUnit.MILLISECONDS);
-
String segment1 = "segment1";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1,
timeFormatSpec.fromFormatToMillis("20200201"),
+ timeFormatSpec.fromFormatToMillis("20200530"), TimeUnit.MILLISECONDS);
onlineSegments.add(segment1);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1,
dateTimeFormatSpec.fromFormatToMillis("20200201"),
- dateTimeFormatSpec.fromFormatToMillis("20200530"),
TimeUnit.MILLISECONDS);
-
String segment2 = "segment2";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2,
timeFormatSpec.fromFormatToMillis("20200401"),
+ timeFormatSpec.fromFormatToMillis("20200430"), TimeUnit.MILLISECONDS);
onlineSegments.add(segment2);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2,
dateTimeFormatSpec.fromFormatToMillis("20200401"),
- dateTimeFormatSpec.fromFormatToMillis("20200430"),
TimeUnit.MILLISECONDS);
-
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments),
Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new
HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(segmentPruner.prune(brokerRequest3, onlineSegments),
Collections.singleton(segment1));
- assertEquals(segmentPruner.prune(brokerRequest4, onlineSegments),
- new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments),
Collections.emptySet());
+
+ Set<String> input = Set.of(segment0, segment1, segment2);
+ assertEquals(segmentPruner.prune(brokerRequest1, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0,
segment1));
+ assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment1));
+ assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of());
}
@Test
- public void testTimeSegmentPrunerSql() {
- BrokerRequest brokerRequest1 =
CalciteSqlCompiler.compileToBrokerRequest(SQL_TIME_QUERY_1);
- BrokerRequest brokerRequest2 =
CalciteSqlCompiler.compileToBrokerRequest(SQL_TIME_QUERY_2);
+ public void testTimeSegmentPrunerTimestampFormat() {
+ BrokerRequest brokerRequest1 =
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_1);
+ BrokerRequest brokerRequest2 =
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_2);
+ BrokerRequest brokerRequest3 =
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_3);
+ BrokerRequest brokerRequest4 =
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_4);
+ BrokerRequest brokerRequest5 =
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_5);
+ BrokerRequest brokerRequest6 =
CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_6);
+
// NOTE: Ideal state and external view are not used in the current
implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
- TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME,
TableType.REALTIME);
- setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
-
- TimeSegmentPruner segmentPruner =
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
- _propertyStore);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+ // Intentionally put EPOCH as the format which Pinot should handle
+ DateTimeFieldSpec timeFieldSpec =
+ new DateTimeFieldSpec(TIME_COLUMN, DataType.TIMESTAMP,
"EPOCH|MILLISECONDS", "1:DAYS");
+ TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig,
timeFieldSpec);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(segmentPruner);
+ DateTimeFormatSpec timeFormatSpec = timeFieldSpec.getFormatSpec();
Set<String> onlineSegments = new HashSet<>();
String segment0 = "segment0";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0,
+ timeFormatSpec.fromFormatToMillis("2020-01-01 00:00:00"),
+ timeFormatSpec.fromFormatToMillis("2020-02-28 00:00:00"),
TimeUnit.MILLISECONDS);
onlineSegments.add(segment0);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60,
TimeUnit.DAYS);
String segment1 = "segment1";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1,
+ timeFormatSpec.fromFormatToMillis("2020-02-01 00:00:00"),
+ timeFormatSpec.fromFormatToMillis("2020-05-30 00:00:00"),
TimeUnit.MILLISECONDS);
onlineSegments.add(segment1);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30,
TimeUnit.DAYS);
String segment2 = "segment2";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2,
+ timeFormatSpec.fromFormatToMillis("2020-04-01 00:00:00"),
+ timeFormatSpec.fromFormatToMillis("2020-04-30 00:00:00"),
TimeUnit.MILLISECONDS);
onlineSegments.add(segment2);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65,
TimeUnit.DAYS);
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new
HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new
HashSet<>(Arrays.asList(segment0, segment1)));
+ Set<String> input = Set.of(segment0, segment1, segment2);
+ assertEquals(segmentPruner.prune(brokerRequest1, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0,
segment1));
+ assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0,
segment1));
+ assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest6, input), input);
}
@Test
public void testEmptySegmentPruner() {
BrokerRequest brokerRequest1 =
CalciteSqlCompiler.compileToBrokerRequest(QUERY_1);
- BrokerRequest brokerRequest2 =
CalciteSqlCompiler.compileToBrokerRequest(QUERY_2);
- BrokerRequest brokerRequest3 =
CalciteSqlCompiler.compileToBrokerRequest(QUERY_3);
+
// NOTE: Ideal state and external view are not used in the current
implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
- TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME,
TableType.REALTIME);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build();
- // init with list of segments
+ // Init with a list of segments
EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig);
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
- _propertyStore);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(segmentPruner);
Set<String> onlineSegments = new HashSet<>();
String segment0 = "segment0";
- onlineSegments.add(segment0);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment0, 10);
+ onlineSegments.add(segment0);
String segment1 = "segment1";
- onlineSegments.add(segment1);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0);
+ onlineSegments.add(segment1);
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment0)));
- assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment0)));
- assertEquals(segmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment0)));
-
- // init with empty list of segments
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments),
Set.of(segment0));
+
+ // Init with no segment
segmentPruner = new EmptySegmentPruner(tableConfig);
segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(segmentPruner);
onlineSegments.clear();
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()),
Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()),
Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()),
Collections.emptySet());
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments),
onlineSegments);
// Segments without metadata (not updated yet) should not be pruned
String newSegment = "newSegment";
onlineSegments.add(newSegment);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1,
Collections.singleton(newSegment)),
- Collections.singleton(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest2,
Collections.singleton(newSegment)),
- Collections.singleton(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest3,
Collections.singleton(newSegment)),
- Collections.singleton(newSegment));
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments),
onlineSegments);
// Segments without totalDocs metadata should not be pruned
- onlineSegments.clear();
String segmentWithoutTotalDocsMetadata = "segmentWithoutTotalDocsMetadata";
- onlineSegments.add(segmentWithoutTotalDocsMetadata);
SegmentZKMetadata segmentZKMetadataWithoutTotalDocsMetadata =
new SegmentZKMetadata(segmentWithoutTotalDocsMetadata);
-
segmentZKMetadataWithoutTotalDocsMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
REALTIME_TABLE_NAME,
segmentZKMetadataWithoutTotalDocsMetadata);
+ onlineSegments.add(segmentWithoutTotalDocsMetadata);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1,
Collections.singleton(segmentWithoutTotalDocsMetadata)),
- Collections.singleton(segmentWithoutTotalDocsMetadata));
- assertEquals(segmentPruner.prune(brokerRequest2,
Collections.singleton(segmentWithoutTotalDocsMetadata)),
- Collections.singleton(segmentWithoutTotalDocsMetadata));
- assertEquals(segmentPruner.prune(brokerRequest3,
Collections.singleton(segmentWithoutTotalDocsMetadata)),
- Collections.singleton(segmentWithoutTotalDocsMetadata));
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments),
onlineSegments);
// Segments with -1 totalDocs should not be pruned
- onlineSegments.clear();
String segmentWithNegativeTotalDocsMetadata =
"segmentWithNegativeTotalDocsMetadata";
- onlineSegments.add(segmentWithNegativeTotalDocsMetadata);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME,
segmentWithNegativeTotalDocsMetadata, -1);
+ onlineSegments.add(segmentWithNegativeTotalDocsMetadata);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1,
Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
- Collections.singleton(segmentWithNegativeTotalDocsMetadata));
- assertEquals(segmentPruner.prune(brokerRequest2,
Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
- Collections.singleton(segmentWithNegativeTotalDocsMetadata));
- assertEquals(segmentPruner.prune(brokerRequest3,
Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
- Collections.singleton(segmentWithNegativeTotalDocsMetadata));
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments),
onlineSegments);
// Prune segments with 0 total docs
onlineSegments.clear();
- onlineSegments.add(segment0);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment0, 10);
- onlineSegments.add(segment1);
+ onlineSegments.add(segment0);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0);
+ onlineSegments.add(segment1);
String segment2 = "segment2";
- onlineSegments.add(segment2);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, -1);
-
+ onlineSegments.add(segment2);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments),
Set.of(segment0, segment2));
// Update metadata without external view change or refreshing should have
no effect
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 20, 30,
TimeUnit.DAYS);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, 0);
- assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments),
Set.of(segment0, segment2));
// Refresh the changed segment should update the segment pruner
segmentZkMetadataFetcher.refreshSegment(segment2);
- assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Collections.singletonList(segment0)));
- assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Collections.singletonList(segment0)));
- assertEquals(segmentPruner.prune(brokerRequest3, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Collections.singletonList(segment0)));
- }
-
- private TableConfig getTableConfig(String rawTableName, TableType type) {
- return new
TableConfigBuilder(type).setTableName(rawTableName).setTimeColumnName(TIME_COLUMN).build();
- }
-
- private void setSchemaDateTimeFieldSpec(String rawTableName, TimeUnit
timeUnit) {
- ZKMetadataProvider.setSchema(_propertyStore, new
Schema.SchemaBuilder().setSchemaName(rawTableName)
- .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:" + timeUnit +
":EPOCH", "1:" + timeUnit).build());
- }
-
- private void setSchemaDateTimeFieldSpecSDF(String rawTableName, String
format) {
- ZKMetadataProvider.setSchema(_propertyStore, new
Schema.SchemaBuilder().setSchemaName(rawTableName)
- .addDateTime(TIME_COLUMN, FieldSpec.DataType.STRING,
"1:DAYS:SIMPLE_DATE_FORMAT:" + format, "1:DAYS").build());
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments),
Set.of(segment0));
}
private void setSegmentZKPartitionMetadata(String tableNameWithType, String
segment, String partitionFunction,
int numPartitions, int partitionId) {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segment);
- segmentZKMetadata.setPartitionMetadata(new
SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN_1,
- new ColumnPartitionMetadata(partitionFunction, numPartitions,
Collections.singleton(partitionId), null))));
+ segmentZKMetadata.setPartitionMetadata(new
SegmentPartitionMetadata(Map.of(PARTITION_COLUMN_1,
+ new ColumnPartitionMetadata(partitionFunction, numPartitions,
Set.of(partitionId), null))));
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType,
segmentZKMetadata);
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
index 626a091005..e8fd128729 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
@@ -221,8 +221,7 @@ public class SchemaTest {
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.DAYS, "time"), null)
.addDateTime("dateTime0", FieldSpec.DataType.LONG, "1:HOURS:EPOCH",
"1:HOURS")
.addDateTime("dateTime1", FieldSpec.DataType.TIMESTAMP,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
- .addDateTime("dateTime2", FieldSpec.DataType.INT, "1:DAYS:EPOCH",
"1:DAYS")
- .build();
+ .addDateTime("dateTime2", FieldSpec.DataType.INT, "1:DAYS:EPOCH",
"1:DAYS").build();
// Test method which fetches the DateTimeFieldSpec given the timeColumnName
// Test is on TIME
@@ -254,7 +253,7 @@ public class SchemaTest {
Assert.assertEquals(dateTimeFieldSpec.getDataType(),
FieldSpec.DataType.TIMESTAMP);
Assert.assertTrue(dateTimeFieldSpec.isSingleValueField());
Assert.assertEquals(dateTimeFieldSpec.getDefaultNullValue(), 0L);
- Assert.assertEquals(dateTimeFieldSpec.getFormat(), "1:MILLISECONDS:EPOCH");
+ Assert.assertEquals(dateTimeFieldSpec.getFormat(), "TIMESTAMP");
Assert.assertEquals(dateTimeFieldSpec.getGranularity(), "1:MILLISECONDS");
dateTimeFieldSpec = schema.getSpecForTimeColumn("dateTime2");
@@ -326,15 +325,10 @@ public class SchemaTest {
@Test
public void testSerializeDeserializeOptions()
throws IOException {
- String json = "{\n"
- + " \"primaryKeyColumns\" : null,\n"
- + " \"timeFieldSpec\" : null,\n"
- + " \"schemaName\" : null,\n"
- + " \"enableColumnBasedNullHandling\" : true,\n"
- + " \"dimensionFieldSpecs\" : [ ],\n"
- + " \"metricFieldSpecs\" : [ ],\n"
- + " \"dateTimeFieldSpecs\" : [ ]\n"
- + "}";
+ String json =
+ "{\n" + " \"primaryKeyColumns\" : null,\n" + " \"timeFieldSpec\" :
null,\n" + " \"schemaName\" : null,\n"
+ + " \"enableColumnBasedNullHandling\" : true,\n" + "
\"dimensionFieldSpecs\" : [ ],\n"
+ + " \"metricFieldSpecs\" : [ ],\n" + " \"dateTimeFieldSpecs\" :
[ ]\n" + "}";
JsonNode expectedNode = JsonUtils.stringToJsonNode(json);
Schema schema = JsonUtils.jsonNodeToObject(expectedNode, Schema.class);
@@ -363,6 +357,17 @@ public class SchemaTest {
Assert.assertEquals(schemaFromJson.hashCode(), schema.hashCode());
}
+ @Test
+ public void testTimestampFormatOverride()
+ throws Exception {
+ URL resourceUrl =
getClass().getClassLoader().getResource("schemaTest.schema");
+ Assert.assertNotNull(resourceUrl);
+ Schema schema = Schema.fromFile(new File(resourceUrl.getFile()));
+ DateTimeFieldSpec fieldSpec = schema.getDateTimeSpec("dateTime3");
+ Assert.assertNotNull(fieldSpec);
+ Assert.assertEquals(fieldSpec.getFormat(), "TIMESTAMP");
+ }
+
@Test
public void testByteType()
throws Exception {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java
index ea9285a104..dbb92090d1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFieldSpec.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
import org.apache.pinot.spi.utils.EqualityUtils;
+
@SuppressWarnings("unused")
@JsonIgnoreProperties(ignoreUnknown = true)
public final class DateTimeFieldSpec extends FieldSpec {
@@ -74,6 +75,10 @@ public final class DateTimeFieldSpec extends FieldSpec {
@Nullable Object sampleValue) {
super(name, dataType, true);
+ // Override format to be "TIMESTAMP" for TIMESTAMP data type because the
format is implicit
+ if (dataType == DataType.TIMESTAMP) {
+ format = TimeFormat.TIMESTAMP.name();
+ }
_format = format;
_granularity = granularity;
_formatSpec = new DateTimeFormatSpec(format);
@@ -119,13 +124,23 @@ public final class DateTimeFieldSpec extends FieldSpec {
Preconditions.checkArgument(isSingleValueField, "Unsupported multi-value
for date time field.");
}
+ @Override
+ public void setDataType(DataType dataType) {
+ super.setDataType(dataType);
+ if (dataType == DataType.TIMESTAMP) {
+ _format = TimeFormat.TIMESTAMP.name();
+ }
+ }
+
public String getFormat() {
return _format;
}
// Required by JSON de-serializer. DO NOT REMOVE.
public void setFormat(String format) {
- _format = format;
+ if (_dataType != DataType.TIMESTAMP) {
+ _format = format;
+ }
}
@JsonIgnore
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]