This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4a0c574983 allow to take data outside the time window by negating the
window filter (#8640)
4a0c574983 is described below
commit 4a0c57498321928aa8eded0407dc63e4e808f577
Author: Xiaobing <[email protected]>
AuthorDate: Thu May 5 11:52:14 2022 -0700
allow to take data outside the time window by negating the window filter
(#8640)
add config negateWindowFilter to let SegmentProcessorFramework take data
outside the time window when generating segments
---
.../apache/pinot/core/common/MinionConstants.java | 1 +
.../processing/timehandler/EpochTimeHandler.java | 10 ++++++---
.../processing/timehandler/TimeHandlerConfig.java | 25 +++++++++++++++++-----
.../processing/timehandler/TimeHandlerFactory.java | 4 ++--
.../processing/framework/SegmentMapperTest.java | 21 +++++++++++++-----
.../framework/SegmentProcessorFrameworkTest.java | 17 +++++++++++++++
.../pinot/plugin/minion/tasks/MergeTaskUtils.java | 8 ++++---
7 files changed, 68 insertions(+), 18 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 8ef5084a84..e0560248ad 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -87,6 +87,7 @@ public class MinionConstants {
// Time handling config
public static final String WINDOW_START_MS_KEY = "windowStartMs";
public static final String WINDOW_END_MS_KEY = "windowEndMs";
+ public static final String NEGATE_WINDOW_FILTER = "negateWindowFilter";
public static final String ROUND_BUCKET_TIME_PERIOD_KEY =
"roundBucketTimePeriod";
public static final String PARTITION_BUCKET_TIME_PERIOD_KEY =
"partitionBucketTimePeriod";
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
index 26e360a3f1..ccba8bfc9f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
@@ -34,16 +34,19 @@ public class EpochTimeHandler implements TimeHandler {
private final DateTimeFormatSpec _formatSpec;
private final long _startTimeMs;
private final long _endTimeMs;
+ private final boolean _negateWindowFilter;
+
private final long _roundBucketMs;
private final long _partitionBucketMs;
- public EpochTimeHandler(DateTimeFieldSpec fieldSpec, long startTimeMs, long
endTimeMs, long roundBucketMs,
- long partitionBucketMs) {
+ public EpochTimeHandler(DateTimeFieldSpec fieldSpec, long startTimeMs, long
endTimeMs, boolean negateWindowFilter,
+ long roundBucketMs, long partitionBucketMs) {
_timeColumn = fieldSpec.getName();
_dataType = fieldSpec.getDataType();
_formatSpec = new DateTimeFormatSpec(fieldSpec.getFormat());
_startTimeMs = startTimeMs;
_endTimeMs = endTimeMs;
+ _negateWindowFilter = negateWindowFilter;
_roundBucketMs = roundBucketMs;
_partitionBucketMs = partitionBucketMs;
}
@@ -52,7 +55,8 @@ public class EpochTimeHandler implements TimeHandler {
public String handleTime(GenericRow row) {
long timeMs =
_formatSpec.fromFormatToMillis(row.getValue(_timeColumn).toString());
if (_startTimeMs > 0) {
- if (timeMs < _startTimeMs || timeMs >= _endTimeMs) {
+ boolean outsideTimeWindow = (timeMs < _startTimeMs || timeMs >=
_endTimeMs);
+ if (outsideTimeWindow != _negateWindowFilter) {
return null;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerConfig.java
index c7fb0c1eb3..2c0bf23882 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerConfig.java
@@ -24,9 +24,11 @@ package org.apache.pinot.core.segment.processing.timehandler;
public class TimeHandlerConfig {
private final TimeHandler.Type _type;
- // Time values not within the time range [_startTimeMs, _endTimeMs) are
filtered out
+ // By default negateWindowFilter is false, and the time values not within
the time range [_startTimeMs, _endTimeMs)
+ // are filtered out. Otherwise, those inside the time range are filtered out
instead.
private final long _startTimeMs;
private final long _endTimeMs;
+ private final boolean _negateWindowFilter;
// Time values are rounded (floored) to the nearest time bucket
// newTimeValue = (originalTimeValue / _roundBucketMs) * _roundBucketMs
@@ -36,11 +38,12 @@ public class TimeHandlerConfig {
// partition = Long.toString(timeMs / _partitionBucketMs)
private final long _partitionBucketMs;
- private TimeHandlerConfig(TimeHandler.Type type, long startTimeMs, long
endTimeMs, long roundBucketMs,
- long partitionBucketMs) {
+ private TimeHandlerConfig(TimeHandler.Type type, long startTimeMs, long
endTimeMs, boolean negateWindowFilter,
+ long roundBucketMs, long partitionBucketMs) {
_type = type;
_startTimeMs = startTimeMs;
_endTimeMs = endTimeMs;
+ _negateWindowFilter = negateWindowFilter;
_roundBucketMs = roundBucketMs;
_partitionBucketMs = partitionBucketMs;
}
@@ -57,6 +60,10 @@ public class TimeHandlerConfig {
return _endTimeMs;
}
+ public boolean isNegateWindowFilter() {
+ return _negateWindowFilter;
+ }
+
public long getRoundBucketMs() {
return _roundBucketMs;
}
@@ -68,7 +75,8 @@ public class TimeHandlerConfig {
@Override
public String toString() {
return "TimeHandlerConfig{" + "_type=" + _type + ", _startTimeMs=" +
_startTimeMs + ", _endTimeMs=" + _endTimeMs
- + ", _roundBucketMs=" + _roundBucketMs + ", _partitionBucketMs=" +
_partitionBucketMs + '}';
+ + ", _negateWindowFilter=" + _negateWindowFilter + ", _roundBucketMs="
+ _roundBucketMs
+ + ", _partitionBucketMs=" + _partitionBucketMs + '}';
}
public static class Builder {
@@ -76,6 +84,7 @@ public class TimeHandlerConfig {
private long _startTimeMs = -1;
private long _endTimeMs = -1;
+ private boolean _negateWindowFilter = false;
private long _roundBucketMs = -1;
private long _partitionBucketMs = -1;
@@ -89,6 +98,11 @@ public class TimeHandlerConfig {
return this;
}
+ public Builder setNegateWindowFilter(boolean negateWindowFilter) {
+ _negateWindowFilter = negateWindowFilter;
+ return this;
+ }
+
public Builder setRoundBucketMs(long roundBucketMs) {
_roundBucketMs = roundBucketMs;
return this;
@@ -100,7 +114,8 @@ public class TimeHandlerConfig {
}
public TimeHandlerConfig build() {
- return new TimeHandlerConfig(_type, _startTimeMs, _endTimeMs,
_roundBucketMs, _partitionBucketMs);
+ return new TimeHandlerConfig(_type, _startTimeMs, _endTimeMs,
_negateWindowFilter, _roundBucketMs,
+ _partitionBucketMs);
}
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerFactory.java
index 6ed859b4f1..b579073fc6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerFactory.java
@@ -48,8 +48,8 @@ public class TimeHandlerFactory {
Preconditions.checkState(dateTimeFieldSpec != null,
"Time column: %s is not configured as DateTimeField within the
schema", timeColumn);
return new EpochTimeHandler(dateTimeFieldSpec,
timeHandlerConfig.getStartTimeMs(),
- timeHandlerConfig.getEndTimeMs(),
timeHandlerConfig.getRoundBucketMs(),
- timeHandlerConfig.getPartitionBucketMs());
+ timeHandlerConfig.getEndTimeMs(),
timeHandlerConfig.isNegateWindowFilter(),
+ timeHandlerConfig.getRoundBucketMs(),
timeHandlerConfig.getPartitionBucketMs());
default:
throw new IllegalStateException("Unsupported time handler type: " +
type);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 207e0c6749..5e83d715b5 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -224,9 +224,10 @@ public class SegmentMapperTest {
new
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setPartitionerConfigs(
Arrays.asList(
new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
- .setColumnName("campaign").build(), new
PartitionerConfig.Builder().setPartitionerType(
-
PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG).setColumnName("clicks")
- .setColumnPartitionConfig(new
ColumnPartitionConfig("Modulo", 3, null)).build())).build();
+ .setColumnName("campaign").build(), new
PartitionerConfig.Builder()
+
.setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG)
+ .setColumnName("clicks").setColumnPartitionConfig(new
ColumnPartitionConfig("Modulo", 3, null))
+ .build())).build();
Map<String, List<Object[]>> expectedRecords4 = new HashMap<>();
for (Object[] record : outputData) {
String partition = "0_" + record[0] + "_" + ((int) record[1] % 3);
@@ -267,11 +268,21 @@ public class SegmentMapperTest {
new
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
new
TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597795200000L,
1597881600000L)
.setRoundBucketMs(3600000).setPartitionBucketMs(86400000).build()).build();
- Map<String, List<Object[]>> expectedRecords9 =
+ Map<String, List<Object[]>> expectedRecords8 =
outputData.stream().filter(r -> ((long) r[2]) >= 1597795200000L &&
((long) r[2]) < 1597881600000L)
.map(r -> new Object[]{r[0], r[1], (((long) r[2]) / 3600000) *
3600000})
.collect(Collectors.groupingBy(r -> Long.toString(((long) r[2]) /
86400000), Collectors.toList()));
- inputs.add(new Object[]{config8, expectedRecords9});
+ inputs.add(new Object[]{config8, expectedRecords8});
+
+ // Time handling - negate filter with certain times
+ SegmentProcessorConfig config9 =
+ new
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
+ new
TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597795200000L,
1597881600000L)
+ .setNegateWindowFilter(true).build()).build();
+ Map<String, List<Object[]>> expectedRecords9 =
+ outputData.stream().filter(r -> !(((long) r[2]) >= 1597795200000L &&
((long) r[2]) < 1597881600000L))
+ .collect(Collectors.groupingBy(r -> "0", Collectors.toList()));
+ inputs.add(new Object[]{config9, expectedRecords9});
return inputs.toArray(new Object[0][]);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index f33262d7cc..402080d5b0 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -281,6 +281,23 @@ public class SegmentProcessorFrameworkTest {
FileUtils.cleanDirectory(workingDir);
rewindRecordReaders(_singleSegment);
+ // Negate time filter
+ config = new
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
+ new
TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597795200000L,
1597881600000L)
+ .setNegateWindowFilter(true).build()).build();
+ framework = new SegmentProcessorFramework(_singleSegment, config,
workingDir);
+ outputSegments = framework.process();
+ assertEquals(outputSegments.size(), 1);
+ segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
+ assertEquals(segmentMetadata.getTotalDocs(), 5);
+ timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+ assertEquals(timeMetadata.getCardinality(), 5);
+ assertEquals(timeMetadata.getMinValue(), 1597719600000L);
+ assertEquals(timeMetadata.getMaxValue(), 1597892400000L);
+ assertEquals(segmentMetadata.getName(),
"myTable_1597719600000_1597892400000_0");
+ FileUtils.cleanDirectory(workingDir);
+ rewindRecordReaders(_singleSegment);
+
// Time filter - filtered everything
config = new
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
new
TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597968000000L,
1598054400000L).build())
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
index 7c027af227..26c9e59e39 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
@@ -64,15 +64,17 @@ public class MergeTaskUtils {
return null;
}
DateTimeFieldSpec fieldSpec = schema.getSpecForTimeColumn(timeColumn);
- Preconditions.checkState(fieldSpec != null, "No valid spec found for time
column: %s in schema for table: %s",
- timeColumn, tableConfig.getTableName());
+ Preconditions
+ .checkState(fieldSpec != null, "No valid spec found for time column:
%s in schema for table: %s", timeColumn,
+ tableConfig.getTableName());
TimeHandlerConfig.Builder timeHandlerConfigBuilder = new
TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH);
String windowStartMs = taskConfig.get(MergeTask.WINDOW_START_MS_KEY);
String windowEndMs = taskConfig.get(MergeTask.WINDOW_END_MS_KEY);
if (windowStartMs != null && windowEndMs != null) {
- timeHandlerConfigBuilder.setTimeRange(Long.parseLong(windowStartMs),
Long.parseLong(windowEndMs));
+ timeHandlerConfigBuilder.setTimeRange(Long.parseLong(windowStartMs),
Long.parseLong(windowEndMs))
+
.setNegateWindowFilter(Boolean.parseBoolean(taskConfig.get(MergeTask.NEGATE_WINDOW_FILTER)));
}
String roundBucketTimePeriod =
taskConfig.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]