This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a0c574983 allow to take data outside the time window by negating the 
window filter (#8640)
4a0c574983 is described below

commit 4a0c57498321928aa8eded0407dc63e4e808f577
Author: Xiaobing <[email protected]>
AuthorDate: Thu May 5 11:52:14 2022 -0700

    allow to take data outside the time window by negating the window filter 
(#8640)
    
    add config negateWindowFilter to let SegmentProcessorFramework take data 
outside the time window when generating segments
---
 .../apache/pinot/core/common/MinionConstants.java  |  1 +
 .../processing/timehandler/EpochTimeHandler.java   | 10 ++++++---
 .../processing/timehandler/TimeHandlerConfig.java  | 25 +++++++++++++++++-----
 .../processing/timehandler/TimeHandlerFactory.java |  4 ++--
 .../processing/framework/SegmentMapperTest.java    | 21 +++++++++++++-----
 .../framework/SegmentProcessorFrameworkTest.java   | 17 +++++++++++++++
 .../pinot/plugin/minion/tasks/MergeTaskUtils.java  |  8 ++++---
 7 files changed, 68 insertions(+), 18 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 8ef5084a84..e0560248ad 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -87,6 +87,7 @@ public class MinionConstants {
     // Time handling config
     public static final String WINDOW_START_MS_KEY = "windowStartMs";
     public static final String WINDOW_END_MS_KEY = "windowEndMs";
+    public static final String NEGATE_WINDOW_FILTER = "negateWindowFilter";
     public static final String ROUND_BUCKET_TIME_PERIOD_KEY = 
"roundBucketTimePeriod";
     public static final String PARTITION_BUCKET_TIME_PERIOD_KEY = 
"partitionBucketTimePeriod";
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
index 26e360a3f1..ccba8bfc9f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/EpochTimeHandler.java
@@ -34,16 +34,19 @@ public class EpochTimeHandler implements TimeHandler {
   private final DateTimeFormatSpec _formatSpec;
   private final long _startTimeMs;
   private final long _endTimeMs;
+  private final boolean _negateWindowFilter;
+
   private final long _roundBucketMs;
   private final long _partitionBucketMs;
 
-  public EpochTimeHandler(DateTimeFieldSpec fieldSpec, long startTimeMs, long 
endTimeMs, long roundBucketMs,
-      long partitionBucketMs) {
+  public EpochTimeHandler(DateTimeFieldSpec fieldSpec, long startTimeMs, long 
endTimeMs, boolean negateWindowFilter,
+      long roundBucketMs, long partitionBucketMs) {
     _timeColumn = fieldSpec.getName();
     _dataType = fieldSpec.getDataType();
     _formatSpec = new DateTimeFormatSpec(fieldSpec.getFormat());
     _startTimeMs = startTimeMs;
     _endTimeMs = endTimeMs;
+    _negateWindowFilter = negateWindowFilter;
     _roundBucketMs = roundBucketMs;
     _partitionBucketMs = partitionBucketMs;
   }
@@ -52,7 +55,8 @@ public class EpochTimeHandler implements TimeHandler {
   public String handleTime(GenericRow row) {
     long timeMs = 
_formatSpec.fromFormatToMillis(row.getValue(_timeColumn).toString());
     if (_startTimeMs > 0) {
-      if (timeMs < _startTimeMs || timeMs >= _endTimeMs) {
+      boolean outsideTimeWindow = (timeMs < _startTimeMs || timeMs >= 
_endTimeMs);
+      if (outsideTimeWindow != _negateWindowFilter) {
         return null;
       }
     }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerConfig.java
index c7fb0c1eb3..2c0bf23882 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerConfig.java
@@ -24,9 +24,11 @@ package org.apache.pinot.core.segment.processing.timehandler;
 public class TimeHandlerConfig {
   private final TimeHandler.Type _type;
 
-  // Time values not within the time range [_startTimeMs, _endTimeMs) are 
filtered out
+  // By default negateWindowFilter is false, and the time values not within 
the time range [_startTimeMs, _endTimeMs)
+  // are filtered out. Otherwise, those inside the time range are filtered out 
instead.
   private final long _startTimeMs;
   private final long _endTimeMs;
+  private final boolean _negateWindowFilter;
 
   // Time values are rounded (floored) to the nearest time bucket
   // newTimeValue = (originalTimeValue / _roundBucketMs) * _roundBucketMs
@@ -36,11 +38,12 @@ public class TimeHandlerConfig {
   // partition = Long.toString(timeMs / _partitionBucketMs)
   private final long _partitionBucketMs;
 
-  private TimeHandlerConfig(TimeHandler.Type type, long startTimeMs, long 
endTimeMs, long roundBucketMs,
-      long partitionBucketMs) {
+  private TimeHandlerConfig(TimeHandler.Type type, long startTimeMs, long 
endTimeMs, boolean negateWindowFilter,
+      long roundBucketMs, long partitionBucketMs) {
     _type = type;
     _startTimeMs = startTimeMs;
     _endTimeMs = endTimeMs;
+    _negateWindowFilter = negateWindowFilter;
     _roundBucketMs = roundBucketMs;
     _partitionBucketMs = partitionBucketMs;
   }
@@ -57,6 +60,10 @@ public class TimeHandlerConfig {
     return _endTimeMs;
   }
 
+  public boolean isNegateWindowFilter() {
+    return _negateWindowFilter;
+  }
+
   public long getRoundBucketMs() {
     return _roundBucketMs;
   }
@@ -68,7 +75,8 @@ public class TimeHandlerConfig {
   @Override
   public String toString() {
     return "TimeHandlerConfig{" + "_type=" + _type + ", _startTimeMs=" + 
_startTimeMs + ", _endTimeMs=" + _endTimeMs
-        + ", _roundBucketMs=" + _roundBucketMs + ", _partitionBucketMs=" + 
_partitionBucketMs + '}';
+        + ", _negateWindowFilter=" + _negateWindowFilter + ", _roundBucketMs=" 
+ _roundBucketMs
+        + ", _partitionBucketMs=" + _partitionBucketMs + '}';
   }
 
   public static class Builder {
@@ -76,6 +84,7 @@ public class TimeHandlerConfig {
 
     private long _startTimeMs = -1;
     private long _endTimeMs = -1;
+    private boolean _negateWindowFilter = false;
     private long _roundBucketMs = -1;
     private long _partitionBucketMs = -1;
 
@@ -89,6 +98,11 @@ public class TimeHandlerConfig {
       return this;
     }
 
+    public Builder setNegateWindowFilter(boolean negateWindowFilter) {
+      _negateWindowFilter = negateWindowFilter;
+      return this;
+    }
+
     public Builder setRoundBucketMs(long roundBucketMs) {
       _roundBucketMs = roundBucketMs;
       return this;
@@ -100,7 +114,8 @@ public class TimeHandlerConfig {
     }
 
     public TimeHandlerConfig build() {
-      return new TimeHandlerConfig(_type, _startTimeMs, _endTimeMs, 
_roundBucketMs, _partitionBucketMs);
+      return new TimeHandlerConfig(_type, _startTimeMs, _endTimeMs, 
_negateWindowFilter, _roundBucketMs,
+          _partitionBucketMs);
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerFactory.java
index 6ed859b4f1..b579073fc6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/timehandler/TimeHandlerFactory.java
@@ -48,8 +48,8 @@ public class TimeHandlerFactory {
         Preconditions.checkState(dateTimeFieldSpec != null,
             "Time column: %s is not configured as DateTimeField within the 
schema", timeColumn);
         return new EpochTimeHandler(dateTimeFieldSpec, 
timeHandlerConfig.getStartTimeMs(),
-            timeHandlerConfig.getEndTimeMs(), 
timeHandlerConfig.getRoundBucketMs(),
-            timeHandlerConfig.getPartitionBucketMs());
+            timeHandlerConfig.getEndTimeMs(), 
timeHandlerConfig.isNegateWindowFilter(),
+            timeHandlerConfig.getRoundBucketMs(), 
timeHandlerConfig.getPartitionBucketMs());
       default:
         throw new IllegalStateException("Unsupported time handler type: " + 
type);
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 207e0c6749..5e83d715b5 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -224,9 +224,10 @@ public class SegmentMapperTest {
         new 
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setPartitionerConfigs(
             Arrays.asList(
                 new 
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
-                    .setColumnName("campaign").build(), new 
PartitionerConfig.Builder().setPartitionerType(
-                        
PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG).setColumnName("clicks")
-                    .setColumnPartitionConfig(new 
ColumnPartitionConfig("Modulo", 3, null)).build())).build();
+                    .setColumnName("campaign").build(), new 
PartitionerConfig.Builder()
+                    
.setPartitionerType(PartitionerFactory.PartitionerType.TABLE_PARTITION_CONFIG)
+                    .setColumnName("clicks").setColumnPartitionConfig(new 
ColumnPartitionConfig("Modulo", 3, null))
+                    .build())).build();
     Map<String, List<Object[]>> expectedRecords4 = new HashMap<>();
     for (Object[] record : outputData) {
       String partition = "0_" + record[0] + "_" + ((int) record[1] % 3);
@@ -267,11 +268,21 @@ public class SegmentMapperTest {
         new 
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
             new 
TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597795200000L, 
1597881600000L)
                 
.setRoundBucketMs(3600000).setPartitionBucketMs(86400000).build()).build();
-    Map<String, List<Object[]>> expectedRecords9 =
+    Map<String, List<Object[]>> expectedRecords8 =
         outputData.stream().filter(r -> ((long) r[2]) >= 1597795200000L && 
((long) r[2]) < 1597881600000L)
             .map(r -> new Object[]{r[0], r[1], (((long) r[2]) / 3600000) * 
3600000})
             .collect(Collectors.groupingBy(r -> Long.toString(((long) r[2]) / 
86400000), Collectors.toList()));
-    inputs.add(new Object[]{config8, expectedRecords9});
+    inputs.add(new Object[]{config8, expectedRecords8});
+
+    // Time handling - negate filter with certain times
+    SegmentProcessorConfig config9 =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
+            new 
TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597795200000L, 
1597881600000L)
+                .setNegateWindowFilter(true).build()).build();
+    Map<String, List<Object[]>> expectedRecords9 =
+        outputData.stream().filter(r -> !(((long) r[2]) >= 1597795200000L && 
((long) r[2]) < 1597881600000L))
+            .collect(Collectors.groupingBy(r -> "0", Collectors.toList()));
+    inputs.add(new Object[]{config9, expectedRecords9});
 
     return inputs.toArray(new Object[0][]);
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index f33262d7cc..402080d5b0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -281,6 +281,23 @@ public class SegmentProcessorFrameworkTest {
     FileUtils.cleanDirectory(workingDir);
     rewindRecordReaders(_singleSegment);
 
+    // Negate time filter
+    config = new 
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
+        new 
TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597795200000L, 
1597881600000L)
+            .setNegateWindowFilter(true).build()).build();
+    framework = new SegmentProcessorFramework(_singleSegment, config, 
workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
+    assertEquals(segmentMetadata.getTotalDocs(), 5);
+    timeMetadata = segmentMetadata.getColumnMetadataFor("time");
+    assertEquals(timeMetadata.getCardinality(), 5);
+    assertEquals(timeMetadata.getMinValue(), 1597719600000L);
+    assertEquals(timeMetadata.getMaxValue(), 1597892400000L);
+    assertEquals(segmentMetadata.getName(), 
"myTable_1597719600000_1597892400000_0");
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
+
     // Time filter - filtered everything
     config = new 
SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
         new 
TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597968000000L, 
1598054400000L).build())
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
index 7c027af227..26c9e59e39 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
@@ -64,15 +64,17 @@ public class MergeTaskUtils {
       return null;
     }
     DateTimeFieldSpec fieldSpec = schema.getSpecForTimeColumn(timeColumn);
-    Preconditions.checkState(fieldSpec != null, "No valid spec found for time 
column: %s in schema for table: %s",
-        timeColumn, tableConfig.getTableName());
+    Preconditions
+        .checkState(fieldSpec != null, "No valid spec found for time column: 
%s in schema for table: %s", timeColumn,
+            tableConfig.getTableName());
 
     TimeHandlerConfig.Builder timeHandlerConfigBuilder = new 
TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH);
 
     String windowStartMs = taskConfig.get(MergeTask.WINDOW_START_MS_KEY);
     String windowEndMs = taskConfig.get(MergeTask.WINDOW_END_MS_KEY);
     if (windowStartMs != null && windowEndMs != null) {
-      timeHandlerConfigBuilder.setTimeRange(Long.parseLong(windowStartMs), 
Long.parseLong(windowEndMs));
+      timeHandlerConfigBuilder.setTimeRange(Long.parseLong(windowStartMs), 
Long.parseLong(windowEndMs))
+          
.setNegateWindowFilter(Boolean.parseBoolean(taskConfig.get(MergeTask.NEGATE_WINDOW_FILTER)));
     }
 
     String roundBucketTimePeriod = 
taskConfig.get(MergeTask.ROUND_BUCKET_TIME_PERIOD_KEY);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to