snleee commented on a change in pull request #6259:
URL: https://github.com/apache/incubator-pinot/pull/6259#discussion_r526637778



##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
##########
@@ -570,14 +570,14 @@ void refreshSegment(String segment) {
     }
 
     InstanceSelector.SelectionResult calculateRouting(BrokerRequest 
brokerRequest) {
-      List<String> selectedSegments = _segmentSelector.select(brokerRequest);
+      Set<String> selectedSegments = _segmentSelector.select(brokerRequest);
       if (!selectedSegments.isEmpty()) {
         for (SegmentPruner segmentPruner : _segmentPruners) {
           selectedSegments = segmentPruner.prune(brokerRequest, 
selectedSegments);
         }
       }
       if (!selectedSegments.isEmpty()) {
-        return _instanceSelector.select(brokerRequest, selectedSegments);
+        return _instanceSelector.select(brokerRequest, new 
ArrayList<>(selectedSegments));

Review comment:
       Why did you change the interface to return `Set` for `SegmentSelector` 
if you will anyway convert it back to the array list?

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
##########
@@ -25,8 +25,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;

Review comment:
       +1 good catch :) 

##########
File path: 
pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
##########
@@ -170,56 +220,213 @@ public void testPartitionAwareSegmentPruner() {
     ZKMetadataProvider
         .setOfflineSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, 
segmentZKMetadataWithoutPartitionMetadata);
     segmentPruner.onExternalViewChange(externalView, idealState, 
onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, 
Collections.singletonList(segmentWithoutPartitionMetadata)),
+    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
         Collections.singletonList(segmentWithoutPartitionMetadata));
-    assertEquals(segmentPruner.prune(brokerRequest2, 
Collections.singletonList(segmentWithoutPartitionMetadata)),
+    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
         Collections.singletonList(segmentWithoutPartitionMetadata));
-    assertEquals(segmentPruner.prune(brokerRequest3, 
Collections.singletonList(segmentWithoutPartitionMetadata)),
+    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
         Collections.singletonList(segmentWithoutPartitionMetadata));
 
     // Test different partition functions and number of partitions
     // 0 % 5 = 0; 1 % 5 = 1; 2 % 5 = 2
     String segment0 = "segment0";
     onlineSegments.add(segment0);
-    setSegmentZKMetadata(segment0, "Modulo", 5, 0);
+    setSegmentZKPartitionMetadata(segment0, "Modulo", 5, 0);
     // Murmur(0) % 4 = 0; Murmur(1) % 4 = 3; Murmur(2) % 4 = 0
     String segment1 = "segment1";
     onlineSegments.add(segment1);
-    setSegmentZKMetadata(segment1, "Murmur", 4, 0);
+    setSegmentZKPartitionMetadata(segment1, "Murmur", 4, 0);
     segmentPruner.onExternalViewChange(externalView, idealState, 
onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, Arrays.asList(segment0, 
segment1)),
-        Arrays.asList(segment0, segment1));
-    assertEquals(segmentPruner.prune(brokerRequest2, Arrays.asList(segment0, 
segment1)),
-        Arrays.asList(segment0, segment1));
-    assertEquals(segmentPruner.prune(brokerRequest3, Arrays.asList(segment0, 
segment1)),
-        Collections.singletonList(segment1));
+    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Arrays.asList(segment0, segment1)));
+    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Arrays.asList(segment0, segment1)));
+    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Collections.singletonList(segment1)));
 
     // Update partition metadata without refreshing should have no effect
-    setSegmentZKMetadata(segment0, "Modulo", 4, 1);
+    setSegmentZKPartitionMetadata(segment0, "Modulo", 4, 1);
     segmentPruner.onExternalViewChange(externalView, idealState, 
onlineSegments);
-    assertEquals(segmentPruner.prune(brokerRequest1, Arrays.asList(segment0, 
segment1)),
-        Arrays.asList(segment0, segment1));
-    assertEquals(segmentPruner.prune(brokerRequest2, Arrays.asList(segment0, 
segment1)),
-        Arrays.asList(segment0, segment1));
-    assertEquals(segmentPruner.prune(brokerRequest3, Arrays.asList(segment0, 
segment1)),
-        Collections.singletonList(segment1));
+    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Arrays.asList(segment0, segment1)));
+    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Arrays.asList(segment0, segment1)));
+    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Collections.singletonList(segment1)));
 
     // Refresh the changed segment should update the segment pruner
     segmentPruner.refreshSegment(segment0);
-    assertEquals(segmentPruner.prune(brokerRequest1, Arrays.asList(segment0, 
segment1)),
-        Arrays.asList(segment0, segment1));
-    assertEquals(segmentPruner.prune(brokerRequest2, Arrays.asList(segment0, 
segment1)),
-        Collections.singletonList(segment1));
-    assertEquals(segmentPruner.prune(brokerRequest3, Arrays.asList(segment0, 
segment1)),
-        Arrays.asList(segment0, segment1));
+    assertEquals(segmentPruner.prune(brokerRequest1, new 
HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Arrays.asList(segment0, segment1)));
+    assertEquals(segmentPruner.prune(brokerRequest2, new 
HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Collections.singletonList(segment1)));
+    assertEquals(segmentPruner.prune(brokerRequest3, new 
HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Arrays.asList(segment0, segment1)));
+  }
+
+  @Test
+  public void testTimeRangeSegmentPruner() {
+    Pql2Compiler compiler = new Pql2Compiler();
+    BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);
+    BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(QUERY_5);
+    BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(QUERY_6);
+    BrokerRequest brokerRequest4 = compiler.compileToBrokerRequest(QUERY_7);
+    BrokerRequest brokerRequest5 = compiler.compileToBrokerRequest(QUERY_8);
+    BrokerRequest brokerRequest6 = compiler.compileToBrokerRequest(QUERY_9);
+    BrokerRequest brokerRequest7 = compiler.compileToBrokerRequest(QUERY_10);

Review comment:
       Can we also add the test with parsing query with SQL?

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
##########
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.segmentpruner;
+
+import com.google.common.base.Preconditions;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.broker.routing.segmentpruner.intervalST.Interval;
+import org.apache.pinot.broker.routing.segmentpruner.intervalST.IntervalST;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The {@code TimeSegmentPruner} prunes segments based on their time column 
start & end time metadata stored in ZK. The pruner
+ * supports queries with filter (or nested filter) of EQUALITY and RANGE 
predicates.
+ */
+public class TimeSegmentPruner implements SegmentPruner {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TimeSegmentPruner.class);
+  private static final long MAX_END_TIME = Long.MAX_VALUE;
+  private static final long MIN_START_TIME = 0;
+
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore _propertyStore;
+  private final String _segmentZKMetadataPathPrefix;
+  private final String _timeColumn;
+  private final TimeUnit _timeUnit;
+
+  private volatile IntervalST _timeRangeMSToSegmentSearchTree;
+  private final Map _segmentToTimeRangeMSCache;
+
+  public TimeSegmentPruner(TableConfig tableConfig, 
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _tableNameWithType = tableConfig.getTableName();
+    _propertyStore = propertyStore;
+    _segmentZKMetadataPathPrefix = 
ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) + 
"/";
+    _timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+    Preconditions
+        .checkNotNull(_timeColumn, "Time column must be configured in table 
config for table: %s", _tableNameWithType);
+
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
+    Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", _tableNameWithType);
+    DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(_timeColumn);
+    Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in 
schema for time column: %s of table: %s",
+        _timeColumn, _tableNameWithType);
+    DateTimeFormatSpec formatSpec = new 
DateTimeFormatSpec(dateTimeSpec.getFormat());
+    _timeUnit = formatSpec.getColumnUnit();
+    Preconditions
+        .checkNotNull(_timeUnit, "Time unit must be configured in the field 
spec for time column: %s of table: %s",
+            _timeColumn, _tableNameWithType);
+    _segmentToTimeRangeMSCache = new HashMap<String, Interval>();
+  }
+
+  @Override
+  public void init(ExternalView externalView, IdealState idealState, 
Set<String> onlineSegments) {
+    onExternalViewChange(externalView, idealState, onlineSegments);
+  }
+
+  @Override
+  synchronized public void onExternalViewChange(ExternalView externalView, 
IdealState idealState, Set<String> onlineSegments) {
+    List<String> newSegments = new ArrayList<>();
+    for (String onlineSegment : onlineSegments) {
+      if (!_segmentToTimeRangeMSCache.containsKey(onlineSegment)) {

Review comment:
       Are we covering the following case?
   
   1.`segment_0` gets uploaded with `start_time=10, end_time=12`
   2. There was an issue with the offline pipeline, we refresh the `segment_0` 
with `start_time=10, end_time=11`.
   
   In this case, `_segmentToTimeRangeMSCache` needs to be updated properly with 
the new interval. Otherwise, our pruner result will be wrong. I think that the 
partition pruner also does not currently handle this issue. We should at least 
add the `TODO` comment to handle this in the future.
   
   One possible approach is to add TTL expiration to the cache to make sure the 
interval eventually gets updated to the correct value. 

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/intervalST/Interval.java
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.segmentpruner.intervalST;
+
+import com.google.common.base.Preconditions;
+import javax.validation.constraints.NotNull;
+
+
+/**
+ * The {@code Interval} class represents an one-dimensional closed interval 
which contains both ends.
+ */
+public class Interval implements Comparable<Interval> {
+  public final long _min;
+  public final long _max;
+
+  public Interval(long min, long max) {
+    Preconditions.checkState(min <= max, "invalid interval [{}, {}]", min, 
max);
+    _min = min;
+    _max = max;
+  }
+
+  public boolean intersects(@NotNull Interval that) {
+    return _max >= that._min && that._max >= _min;
+  }
+
+  @Override
+  public int compareTo(Interval that) {
+    Preconditions.checkNotNull(that, "Compare to invalid interval: null");
+    if (_min < that._min) {
+      return -1;
+    } else if (_min > that._min) {
+      return 1;
+    } else if (_max < that._max) {
+      return -1;
+    } else if (_max > that._max) {
+      return 1;
+    }
+    else return 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)(_min * 17 + _max);
+  }
+
+  @Override
+  public boolean equals(Object that) {

Review comment:
       Is this intellij auto generated `equals()`? If not, let's use the one 
generated by intellij.

##########
File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/intervalST/Interval.java
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.segmentpruner.intervalST;
+
+import com.google.common.base.Preconditions;
+import javax.validation.constraints.NotNull;
+
+
+/**
+ * The {@code Interval} class represents an one-dimensional closed interval 
which contains both ends.
+ */
+public class Interval implements Comparable<Interval> {
+  public final long _min;
+  public final long _max;
+
+  public Interval(long min, long max) {
+    Preconditions.checkState(min <= max, "invalid interval [{}, {}]", min, 
max);
+    _min = min;
+    _max = max;
+  }
+
+  public boolean intersects(@NotNull Interval that) {
+    return _max >= that._min && that._max >= _min;
+  }
+
+  @Override
+  public int compareTo(Interval that) {
+    Preconditions.checkNotNull(that, "Compare to invalid interval: null");
+    if (_min < that._min) {
+      return -1;
+    } else if (_min > that._min) {
+      return 1;
+    } else if (_max < that._max) {
+      return -1;
+    } else if (_max > that._max) {
+      return 1;
+    }
+    else return 0;
+  }
+
+  @Override
+  public int hashCode() {

Review comment:
       Is this autogenerated by intelij?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to