This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 48deafdf94 create segment zk metadata cache (#10455)
48deafdf94 is described below
commit 48deafdf944fd320ba3663ee9579c353e1d03c93
Author: Rong Rong <[email protected]>
AuthorDate: Fri Mar 31 07:39:02 2023 -0700
create segment zk metadata cache (#10455)
* [partition] add partition routing to routing manager
- add SegmentZkMetadataFetchListener interface
- maintain a seen list of onlineSegments so not pull ZK when unnecessary
- when no pruner registered, do not pull ZK
- refactor Segment ZNRecord refresher
- adding tests
---------
Co-authored-by: Rong Rong <[email protected]>
---
.../pinot/broker/routing/BrokerRoutingManager.java | 31 ++--
.../SegmentZkMetadataFetchListener.java} | 25 +--
.../segmentmetadata/SegmentZkMetadataFetcher.java | 122 ++++++++++++++
.../routing/segmentpruner/EmptySegmentPruner.java | 40 ++---
.../MultiPartitionColumnsSegmentPruner.java | 43 ++---
.../routing/segmentpruner/SegmentPruner.java | 24 +--
.../segmentpruner/SegmentPrunerFactory.java | 33 +++-
.../SinglePartitionColumnSegmentPruner.java | 42 ++---
.../routing/segmentpruner/TimeSegmentPruner.java | 55 ++-----
.../SegmentZkMetadataFetcherTest.java | 175 +++++++++++++++++++++
.../routing/segmentpruner/SegmentPrunerTest.java | 86 ++++++----
11 files changed, 467 insertions(+), 209 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 883e9cfb02..93edc45f52 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -43,6 +43,8 @@ import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSele
import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelectorFactory;
import org.apache.pinot.broker.routing.instanceselector.InstanceSelector;
import
org.apache.pinot.broker.routing.instanceselector.InstanceSelectorFactory;
+import
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener;
+import
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetcher;
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
import
org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelectorFactory;
import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
@@ -430,10 +432,10 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
Set<String> preSelectedOnlineSegments =
segmentPreSelector.preSelect(onlineSegments);
SegmentSelector segmentSelector =
SegmentSelectorFactory.getSegmentSelector(tableConfig);
segmentSelector.init(idealState, externalView, preSelectedOnlineSegments);
+
+ // Register segment pruners and initialize segment zk metadata fetcher.
List<SegmentPruner> segmentPruners =
SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- for (SegmentPruner segmentPruner : segmentPruners) {
- segmentPruner.init(idealState, externalView, preSelectedOnlineSegments);
- }
+
AdaptiveServerSelector adaptiveServerSelector =
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
_pinotConfig);
InstanceSelector instanceSelector =
@@ -488,10 +490,16 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
QueryConfig queryConfig = tableConfig.getQueryConfig();
Long queryTimeoutMs = queryConfig != null ? queryConfig.getTimeoutMs() :
null;
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(tableNameWithType, _propertyStore);
+ for (SegmentZkMetadataFetchListener listener : segmentPruners) {
+ segmentZkMetadataFetcher.register(listener);
+ }
+ segmentZkMetadataFetcher.init(idealState, externalView,
preSelectedOnlineSegments);
+
RoutingEntry routingEntry =
new RoutingEntry(tableNameWithType, idealStatePath, externalViewPath,
segmentPreSelector, segmentSelector,
- segmentPruners, instanceSelector, idealStateVersion,
externalViewVersion, timeBoundaryManager,
- queryTimeoutMs);
+ segmentPruners, instanceSelector, idealStateVersion,
externalViewVersion, segmentZkMetadataFetcher,
+ timeBoundaryManager, queryTimeoutMs);
if (_routingEntryMap.put(tableNameWithType, routingEntry) == null) {
LOGGER.info("Built routing for table: {}", tableNameWithType);
} else {
@@ -638,6 +646,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
final List<SegmentPruner> _segmentPruners;
final InstanceSelector _instanceSelector;
final Long _queryTimeoutMs;
+ final SegmentZkMetadataFetcher _segmentZkMetadataFetcher;
// Cache IdealState and ExternalView version for the last update
transient int _lastUpdateIdealStateVersion;
@@ -648,7 +657,8 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
RoutingEntry(String tableNameWithType, String idealStatePath, String
externalViewPath,
SegmentPreSelector segmentPreSelector, SegmentSelector
segmentSelector, List<SegmentPruner> segmentPruners,
InstanceSelector instanceSelector, int lastUpdateIdealStateVersion,
int lastUpdateExternalViewVersion,
- @Nullable TimeBoundaryManager timeBoundaryManager, @Nullable Long
queryTimeoutMs) {
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher, @Nullable
TimeBoundaryManager timeBoundaryManager,
+ @Nullable Long queryTimeoutMs) {
_tableNameWithType = tableNameWithType;
_idealStatePath = idealStatePath;
_externalViewPath = externalViewPath;
@@ -660,6 +670,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
_lastUpdateExternalViewVersion = lastUpdateExternalViewVersion;
_timeBoundaryManager = timeBoundaryManager;
_queryTimeoutMs = queryTimeoutMs;
+ _segmentZkMetadataFetcher = segmentZkMetadataFetcher;
}
String getTableNameWithType() {
@@ -693,10 +704,8 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
void onAssignmentChange(IdealState idealState, ExternalView externalView) {
Set<String> onlineSegments = getOnlineSegments(idealState);
Set<String> preSelectedOnlineSegments =
_segmentPreSelector.preSelect(onlineSegments);
+ _segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
preSelectedOnlineSegments);
_segmentSelector.onAssignmentChange(idealState, externalView,
preSelectedOnlineSegments);
- for (SegmentPruner segmentPruner : _segmentPruners) {
- segmentPruner.onAssignmentChange(idealState, externalView,
preSelectedOnlineSegments);
- }
_instanceSelector.onAssignmentChange(idealState, externalView,
preSelectedOnlineSegments);
if (_timeBoundaryManager != null) {
_timeBoundaryManager.onAssignmentChange(idealState, externalView,
preSelectedOnlineSegments);
@@ -710,9 +719,7 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
}
void refreshSegment(String segment) {
- for (SegmentPruner segmentPruner : _segmentPruners) {
- segmentPruner.refreshSegment(segment);
- }
+ _segmentZkMetadataFetcher.refreshSegment(segment);
if (_timeBoundaryManager != null) {
_timeBoundaryManager.refreshSegment(segment);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetchListener.java
similarity index 68%
copy from
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
copy to
pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetchListener.java
index 5893e6bd92..138291089d 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetchListener.java
@@ -16,40 +16,41 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.broker.routing.segmentpruner;
+package org.apache.pinot.broker.routing.segmentmetadata;
+import java.util.List;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
-import org.apache.pinot.common.request.BrokerRequest;
/**
- * The segment pruner prunes the selected segments based on the query.
+ * Interface to register with {@link SegmentZkMetadataFetcher}.
+ *
+ * <p>When registered, SegmentZKMetadataFetcher will fetch {@link ZNRecord}
for associated {@code onlineSegments} list
+ * or refreshed {@code segment}. Thus batch up all ZK access for segment
metadata.
*/
-public interface SegmentPruner {
+public interface SegmentZkMetadataFetchListener {
/**
* Initializes the segment pruner with the ideal state, external view and
online segments (segments with
* ONLINE/CONSUMING instances in the ideal state and pre-selected by the
{@link SegmentPreSelector}). Should be called
* only once before calling other methods.
*/
- void init(IdealState idealState, ExternalView externalView, Set<String>
onlineSegments);
+ void init(IdealState idealState, ExternalView externalView, List<String>
onlineSegments, List<ZNRecord> znRecords);
/**
* Processes the segment assignment (ideal state or external view) change
based on the given online segments (segments
* with ONLINE/CONSUMING instances in the ideal state and pre-selected by
the {@link SegmentPreSelector}).
*/
- void onAssignmentChange(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments);
+ void onAssignmentChange(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments,
+ List<String> pulledSegments, List<ZNRecord> znRecords);
/**
* Refreshes the metadata for the given segment (called when segment is
getting refreshed).
*/
- void refreshSegment(String segment);
-
- /**
- * Prunes the segments queried by the given broker request, returns the
selected segments to be queried.
- */
- Set<String> prune(BrokerRequest brokerRequest, Set<String> segments);
+ void refreshSegment(String segment, @Nullable ZNRecord znRecord);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java
new file mode 100644
index 0000000000..de9ec0a699
--- /dev/null
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcher.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.segmentmetadata;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.AccessOption;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+
+
+/**
+ * {@code SegmentZkMetadataFetcher} is used to cache {@link ZNRecord} stored
in {@link ZkHelixPropertyStore} for
+ * segments.
+ */
+public class SegmentZkMetadataFetcher {
+ private final String _tableNameWithType;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private final String _segmentZKMetadataPathPrefix;
+ private final List<SegmentZkMetadataFetchListener> _listeners;
+ private final Set<String> _onlineSegmentsCached;
+
+ private boolean _initialized;
+
+ public SegmentZkMetadataFetcher(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ _tableNameWithType = tableNameWithType;
+ _propertyStore = propertyStore;
+ _segmentZKMetadataPathPrefix =
ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) +
"/";
+ _listeners = new ArrayList<>();
+ _onlineSegmentsCached = new HashSet<>();
+ _initialized = false;
+ }
+
+ public void init(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
+ if (!_initialized) {
+ _initialized = true;
+ if (!_listeners.isEmpty()) {
+ // Bulk load partition info for all online segments
+ int numSegments = onlineSegments.size();
+ List<String> segments = new ArrayList<>(numSegments);
+ List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
+ for (String segment : onlineSegments) {
+ segments.add(segment);
+ segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
+ }
+ _onlineSegmentsCached.addAll(onlineSegments);
+ List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths,
null, AccessOption.PERSISTENT, false);
+ for (SegmentZkMetadataFetchListener listener : _listeners) {
+ listener.init(idealState, externalView, segments, znRecords);
+ }
+ }
+ } else {
+ throw new RuntimeException("Segment zk metadata fetcher has already been
initialized!");
+ }
+ }
+
+ public void register(SegmentZkMetadataFetchListener listener) {
+ if (!_initialized) {
+ _listeners.add(listener);
+ } else {
+ throw new RuntimeException("Segment zk metadata fetcher has already been
initialized! "
+ + "Unable to register more listeners.");
+ }
+ }
+
+ public List<SegmentZkMetadataFetchListener> getListeners() {
+ return _listeners;
+ }
+
+ public synchronized void onAssignmentChange(IdealState idealState,
ExternalView externalView,
+ Set<String> onlineSegments) {
+ if (!_listeners.isEmpty()) {
+ int numSegments = onlineSegments.size();
+ List<String> segments = new ArrayList<>(numSegments);
+ List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
+
+ for (String segment : onlineSegments) {
+ if (_onlineSegmentsCached.add(segment)) {
+ segments.add(segment);
+ segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
+ }
+ }
+ _onlineSegmentsCached.addAll(onlineSegments);
+ List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths,
null, AccessOption.PERSISTENT, false);
+ for (SegmentZkMetadataFetchListener listener : _listeners) {
+ listener.onAssignmentChange(idealState, externalView, onlineSegments,
segments, znRecords);
+ }
+ _onlineSegmentsCached.retainAll(onlineSegments);
+ }
+ }
+
+ public synchronized void refreshSegment(String segment) {
+ if (!_listeners.isEmpty()) {
+ ZNRecord znRecord = _propertyStore.get(_segmentZKMetadataPathPrefix +
segment, null, AccessOption.PERSISTENT);
+ for (SegmentZkMetadataFetchListener listener : _listeners) {
+ listener.refreshSegment(segment, znRecord);
+ }
+ _onlineSegmentsCached.add(segment);
+ }
+ }
+}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
index 7a7b66b086..aeb6a01f2e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
@@ -18,18 +18,14 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
-import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -45,35 +41,23 @@ public class EmptySegmentPruner implements SegmentPruner {
private static final Logger LOGGER =
LoggerFactory.getLogger(EmptySegmentPruner.class);
private final String _tableNameWithType;
- private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
- private final String _segmentZKMetadataPathPrefix;
private final Set<String> _segmentsLoaded = new HashSet<>();
private final Set<String> _emptySegments = ConcurrentHashMap.newKeySet();
private volatile ResultCache _resultCache;
- public EmptySegmentPruner(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ public EmptySegmentPruner(TableConfig tableConfig) {
_tableNameWithType = tableConfig.getTableName();
- _propertyStore = propertyStore;
- _segmentZKMetadataPathPrefix =
ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) +
"/";
}
@Override
- public void init(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
+ public void init(IdealState idealState, ExternalView externalView,
List<String> onlineSegments,
+ List<ZNRecord> znRecords) {
// Bulk load info for all online segments
- int numSegments = onlineSegments.size();
- List<String> segments = new ArrayList<>(numSegments);
- List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
- for (String segment : onlineSegments) {
- segments.add(segment);
- segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
- }
- _segmentsLoaded.addAll(segments);
- List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths,
null, AccessOption.PERSISTENT, false);
- for (int i = 0; i < numSegments; i++) {
- String segment = segments.get(i);
- if (isEmpty(segment, znRecords.get(i))) {
+ for (int idx = 0; idx < onlineSegments.size(); idx++) {
+ String segment = onlineSegments.get(idx);
+ if (isEmpty(segment, znRecords.get(idx))) {
_emptySegments.add(segment);
}
}
@@ -81,14 +65,14 @@ public class EmptySegmentPruner implements SegmentPruner {
@Override
public synchronized void onAssignmentChange(IdealState idealState,
ExternalView externalView,
- Set<String> onlineSegments) {
+ Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord>
znRecords) {
// NOTE: We don't update all the segment ZK metadata for every external
view change, but only the new added/removed
// ones. The refreshed segment ZK metadata change won't be picked up.
boolean emptySegmentsChanged = false;
- for (String segment : onlineSegments) {
+ for (int idx = 0; idx < pulledSegments.size(); idx++) {
+ String segment = pulledSegments.get(idx);
if (_segmentsLoaded.add(segment)) {
- if (isEmpty(segment,
- _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null,
AccessOption.PERSISTENT))) {
+ if (isEmpty(segment, znRecords.get(idx))) {
emptySegmentsChanged |= _emptySegments.add(segment);
}
}
@@ -103,9 +87,9 @@ public class EmptySegmentPruner implements SegmentPruner {
}
@Override
- public synchronized void refreshSegment(String segment) {
+ public synchronized void refreshSegment(String segment, @Nullable ZNRecord
znRecord) {
_segmentsLoaded.add(segment);
- if (isEmpty(segment, _propertyStore.get(_segmentZKMetadataPathPrefix +
segment, null, AccessOption.PERSISTENT))) {
+ if (isEmpty(segment, znRecord)) {
if (_emptySegments.add(segment)) {
// Reset the result cache when empty segments changed
_resultCache = null;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
index dd4edb686b..2f3216cbf2 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
@@ -19,7 +19,6 @@
package org.apache.pinot.broker.routing.segmentpruner;
import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -28,12 +27,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
-import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
@@ -58,33 +54,21 @@ public class MultiPartitionColumnsSegmentPruner implements
SegmentPruner {
private final String _tableNameWithType;
private final Set<String> _partitionColumns;
- private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
- private final String _segmentZKMetadataPathPrefix;
private final Map<String, Map<String, PartitionInfo>>
_segmentColumnPartitionInfoMap = new ConcurrentHashMap<>();
- public MultiPartitionColumnsSegmentPruner(String tableNameWithType,
Set<String> partitionColumns,
- ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ public MultiPartitionColumnsSegmentPruner(String tableNameWithType,
Set<String> partitionColumns) {
_tableNameWithType = tableNameWithType;
_partitionColumns = partitionColumns;
- _propertyStore = propertyStore;
- _segmentZKMetadataPathPrefix =
ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) +
"/";
}
@Override
- public void init(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
+ public void init(IdealState idealState, ExternalView externalView,
List<String> onlineSegments,
+ List<ZNRecord> znRecords) {
// Bulk load partition info for all online segments
- int numSegments = onlineSegments.size();
- List<String> segments = new ArrayList<>(numSegments);
- List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
- for (String segment : onlineSegments) {
- segments.add(segment);
- segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
- }
- List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths,
null, AccessOption.PERSISTENT, false);
- for (int i = 0; i < numSegments; i++) {
- String segment = segments.get(i);
+ for (int idx = 0; idx < onlineSegments.size(); idx++) {
+ String segment = onlineSegments.get(idx);
Map<String, PartitionInfo> columnPartitionInfoMap =
- extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment,
znRecords.get(i));
+ extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment,
znRecords.get(idx));
if (columnPartitionInfoMap != null) {
_segmentColumnPartitionInfoMap.put(segment, columnPartitionInfoMap);
}
@@ -143,21 +127,22 @@ public class MultiPartitionColumnsSegmentPruner
implements SegmentPruner {
@Override
public synchronized void onAssignmentChange(IdealState idealState,
ExternalView externalView,
- Set<String> onlineSegments) {
+ Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord>
znRecords) {
// NOTE: We don't update all the segment ZK metadata for every external
view change, but only the new added/removed
// ones. The refreshed segment ZK metadata change won't be picked up.
- for (String segment : onlineSegments) {
+ for (int idx = 0; idx < pulledSegments.size(); idx++) {
+ String segment = pulledSegments.get(idx);
+ ZNRecord znRecord = znRecords.get(idx);
_segmentColumnPartitionInfoMap.computeIfAbsent(segment,
- k -> extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(k,
- _propertyStore.get(_segmentZKMetadataPathPrefix + k, null,
AccessOption.PERSISTENT)));
+ k -> extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(k,
znRecord));
}
_segmentColumnPartitionInfoMap.keySet().retainAll(onlineSegments);
}
@Override
- public synchronized void refreshSegment(String segment) {
- Map<String, PartitionInfo> columnPartitionInfo =
extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment,
- _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null,
AccessOption.PERSISTENT));
+ public synchronized void refreshSegment(String segment, @Nullable ZNRecord
znRecord) {
+ Map<String, PartitionInfo> columnPartitionInfo =
+ extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(segment,
znRecord);
if (columnPartitionInfo != null) {
_segmentColumnPartitionInfoMap.put(segment, columnPartitionInfo);
} else {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
index 5893e6bd92..17e92e5c15 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
@@ -19,34 +19,14 @@
package org.apache.pinot.broker.routing.segmentpruner;
import java.util.Set;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
+import
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener;
import org.apache.pinot.common.request.BrokerRequest;
/**
* The segment pruner prunes the selected segments based on the query.
*/
-public interface SegmentPruner {
-
- /**
- * Initializes the segment pruner with the ideal state, external view and
online segments (segments with
- * ONLINE/CONSUMING instances in the ideal state and pre-selected by the
{@link SegmentPreSelector}). Should be called
- * only once before calling other methods.
- */
- void init(IdealState idealState, ExternalView externalView, Set<String>
onlineSegments);
-
- /**
- * Processes the segment assignment (ideal state or external view) change
based on the given online segments (segments
- * with ONLINE/CONSUMING instances in the ideal state and pre-selected by
the {@link SegmentPreSelector}).
- */
- void onAssignmentChange(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments);
-
- /**
- * Refreshes the metadata for the given segment (called when segment is
getting refreshed).
- */
- void refreshSegment(String segment);
+public interface SegmentPruner extends SegmentZkMetadataFetchListener {
/**
* Prunes the segments queried by the given broker request, returns the
selected segments to be queried.
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index f34f55f3c3..6135982e18 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -26,6 +28,7 @@ import javax.annotation.Nullable;
import org.apache.commons.collections.MapUtils;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -33,6 +36,9 @@ import
org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +58,7 @@ public class SegmentPrunerFactory {
boolean needsEmptySegment =
TableConfigUtils.needsEmptySegmentPruner(tableConfig);
if (needsEmptySegment) {
// Add EmptySegmentPruner if needed
- segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore));
+ segmentPruners.add(new EmptySegmentPruner(tableConfig));
}
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
@@ -113,8 +119,8 @@ public class SegmentPrunerFactory {
LOGGER.info("Using PartitionSegmentPruner on partition columns: {} for
table: {}", partitionColumns,
tableNameWithType);
return partitionColumns.size() == 1 ? new
SinglePartitionColumnSegmentPruner(tableNameWithType,
- partitionColumns.iterator().next(), propertyStore)
- : new MultiPartitionColumnsSegmentPruner(tableNameWithType,
partitionColumns, propertyStore);
+ partitionColumns.iterator().next())
+ : new MultiPartitionColumnsSegmentPruner(tableNameWithType,
partitionColumns);
}
@Nullable
@@ -131,9 +137,26 @@ public class SegmentPrunerFactory {
LOGGER.warn("Cannot enable time range pruning without time column for
table: {}", tableNameWithType);
return null;
}
+ return createTimeSegmentPruner(tableConfig, propertyStore);
+ }
+
+ @VisibleForTesting
+ static TimeSegmentPruner createTimeSegmentPruner(TableConfig tableConfig,
+ ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ String tableNameWithType = tableConfig.getTableName();
+ String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
+ Preconditions.checkNotNull(timeColumn, "Time column must be configured in
table config for table: %s",
+ tableNameWithType);
+ Schema schema = ZKMetadataProvider.getTableSchema(propertyStore,
tableNameWithType);
+ Preconditions.checkNotNull(schema, "Failed to find schema for table: %s",
tableNameWithType);
+ DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
+ Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in
schema for time column: %s of table: %s",
+ timeColumn, tableNameWithType);
+ DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec();
- LOGGER.info("Using TimeRangePruner on time column: {} for table: {}",
timeColumn, tableNameWithType);
- return new TimeSegmentPruner(tableConfig, propertyStore);
+ LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with
DateTimeFormatSpec: {}",
+ timeColumn, tableNameWithType, timeFormatSpec);
+ return new TimeSegmentPruner(tableConfig, timeColumn, timeFormatSpec);
}
private static List<SegmentPruner> sortSegmentPruners(List<SegmentPruner>
pruners) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
index 2c75782ef3..2959537706 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
@@ -18,19 +18,15 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
-import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
@@ -55,32 +51,20 @@ public class SinglePartitionColumnSegmentPruner implements
SegmentPruner {
private final String _tableNameWithType;
private final String _partitionColumn;
- private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
- private final String _segmentZKMetadataPathPrefix;
private final Map<String, PartitionInfo> _partitionInfoMap = new
ConcurrentHashMap<>();
- public SinglePartitionColumnSegmentPruner(String tableNameWithType, String
partitionColumn,
- ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ public SinglePartitionColumnSegmentPruner(String tableNameWithType, String
partitionColumn) {
_tableNameWithType = tableNameWithType;
_partitionColumn = partitionColumn;
- _propertyStore = propertyStore;
- _segmentZKMetadataPathPrefix =
ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) +
"/";
}
@Override
- public void init(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
+ public void init(IdealState idealState, ExternalView externalView,
List<String> onlineSegments,
+ List<ZNRecord> znRecords) {
// Bulk load partition info for all online segments
- int numSegments = onlineSegments.size();
- List<String> segments = new ArrayList<>(numSegments);
- List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
- for (String segment : onlineSegments) {
- segments.add(segment);
- segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
- }
- List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths,
null, AccessOption.PERSISTENT, false);
- for (int i = 0; i < numSegments; i++) {
- String segment = segments.get(i);
- PartitionInfo partitionInfo =
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
+ for (int idx = 0; idx < onlineSegments.size(); idx++) {
+ String segment = onlineSegments.get(idx);
+ PartitionInfo partitionInfo =
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(idx));
if (partitionInfo != null) {
_partitionInfoMap.put(segment, partitionInfo);
}
@@ -129,20 +113,20 @@ public class SinglePartitionColumnSegmentPruner
implements SegmentPruner {
@Override
public synchronized void onAssignmentChange(IdealState idealState,
ExternalView externalView,
- Set<String> onlineSegments) {
+ Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord>
znRecords) {
// NOTE: We don't update all the segment ZK metadata for every external
view change, but only the new added/removed
// ones. The refreshed segment ZK metadata change won't be picked up.
- for (String segment : onlineSegments) {
- _partitionInfoMap.computeIfAbsent(segment, k ->
extractPartitionInfoFromSegmentZKMetadataZNRecord(k,
- _propertyStore.get(_segmentZKMetadataPathPrefix + k, null,
AccessOption.PERSISTENT)));
+ for (int idx = 0; idx < pulledSegments.size(); idx++) {
+ String segment = pulledSegments.get(idx);
+ ZNRecord znRecord = znRecords.get(idx);
+ _partitionInfoMap.computeIfAbsent(segment, k ->
extractPartitionInfoFromSegmentZKMetadataZNRecord(k, znRecord));
}
_partitionInfoMap.keySet().retainAll(onlineSegments);
}
@Override
- public synchronized void refreshSegment(String segment) {
- PartitionInfo partitionInfo =
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment,
- _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null,
AccessOption.PERSISTENT));
+ public synchronized void refreshSegment(String segment, @Nullable ZNRecord
znRecord) {
+ PartitionInfo partitionInfo =
extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecord);
if (partitionInfo != null) {
_partitionInfoMap.put(segment, partitionInfo);
} else {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
index c123c65a67..a7ac4fce4b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;
-import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -29,22 +28,17 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.AccessOption;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.routing.segmentpruner.interval.Interval;
import org.apache.pinot.broker.routing.segmentpruner.interval.IntervalTree;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.Identifier;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Query.Range;
import org.apache.pinot.sql.FilterKind;
@@ -64,44 +58,25 @@ public class TimeSegmentPruner implements SegmentPruner {
private static final Interval DEFAULT_INTERVAL = new
Interval(MIN_START_TIME, MAX_END_TIME);
private final String _tableNameWithType;
- private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
- private final String _segmentZKMetadataPathPrefix;
private final String _timeColumn;
private final DateTimeFormatSpec _timeFormatSpec;
private volatile IntervalTree<String> _intervalTree;
private final Map<String, Interval> _intervalMap = new HashMap<>();
- public TimeSegmentPruner(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ public TimeSegmentPruner(TableConfig tableConfig, String timeColumn,
DateTimeFormatSpec timeFormatSpec) {
_tableNameWithType = tableConfig.getTableName();
- _propertyStore = propertyStore;
- _segmentZKMetadataPathPrefix =
ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) +
"/";
- _timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
- Preconditions.checkNotNull(_timeColumn, "Time column must be configured in
table config for table: %s",
- _tableNameWithType);
-
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
_tableNameWithType);
- Preconditions.checkNotNull(schema, "Failed to find schema for table: %s",
_tableNameWithType);
- DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(_timeColumn);
- Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in
schema for time column: %s of table: %s",
- _timeColumn, _tableNameWithType);
- _timeFormatSpec = dateTimeSpec.getFormatSpec();
+ _timeColumn = timeColumn;
+ _timeFormatSpec = timeFormatSpec;
}
@Override
- public void init(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments) {
+ public void init(IdealState idealState, ExternalView externalView,
List<String> onlineSegments,
+ List<ZNRecord> znRecords) {
// Bulk load time info for all online segments
- int numSegments = onlineSegments.size();
- List<String> segments = new ArrayList<>(numSegments);
- List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
- for (String segment : onlineSegments) {
- segments.add(segment);
- segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
- }
- List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths,
null, AccessOption.PERSISTENT, false);
- for (int i = 0; i < numSegments; i++) {
- String segment = segments.get(i);
- Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment,
znRecords.get(i));
+ for (int idx = 0; idx < onlineSegments.size(); idx++) {
+ String segment = onlineSegments.get(idx);
+ Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment,
znRecords.get(idx));
_intervalMap.put(segment, interval);
}
_intervalTree = new IntervalTree<>(_intervalMap);
@@ -128,21 +103,21 @@ public class TimeSegmentPruner implements SegmentPruner {
@Override
public synchronized void onAssignmentChange(IdealState idealState,
ExternalView externalView,
- Set<String> onlineSegments) {
+ Set<String> onlineSegments, List<String> pulledSegments, List<ZNRecord>
znRecords) {
// NOTE: We don't update all the segment ZK metadata for every external
view change, but only the new added/removed
// ones. The refreshed segment ZK metadata change won't be picked up.
- for (String segment : onlineSegments) {
- _intervalMap.computeIfAbsent(segment, k ->
extractIntervalFromSegmentZKMetaZNRecord(k,
- _propertyStore.get(_segmentZKMetadataPathPrefix + k, null,
AccessOption.PERSISTENT)));
+ for (int idx = 0; idx < pulledSegments.size(); idx++) {
+ String segment = pulledSegments.get(idx);
+ ZNRecord zNrecord = znRecords.get(idx);
+ _intervalMap.computeIfAbsent(segment, k ->
extractIntervalFromSegmentZKMetaZNRecord(k, zNrecord));
}
_intervalMap.keySet().retainAll(onlineSegments);
_intervalTree = new IntervalTree<>(_intervalMap);
}
@Override
- public synchronized void refreshSegment(String segment) {
- Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment,
- _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null,
AccessOption.PERSISTENT));
+ public synchronized void refreshSegment(String segment, @Nullable ZNRecord
znRecord) {
+ Interval interval = extractIntervalFromSegmentZKMetaZNRecord(segment,
znRecord);
_intervalMap.put(segment, interval);
_intervalTree = new IntervalTree<>(_intervalMap);
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcherTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcherTest.java
new file mode 100644
index 0000000000..a63811dccd
--- /dev/null
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentmetadata/SegmentZkMetadataFetcherTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.segmentmetadata;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+
+public class SegmentZkMetadataFetcherTest extends ControllerTest {
+ private static final String OFFLINE_TABLE_NAME = "testTable_OFFLINE";
+
+ @Test
+ public void
testSegmentZkMetadataFetcherShouldNotAllowIncorrectRegisterOrInitBehavior() {
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
Mockito.mock(ZkHelixPropertyStore.class);
+ IdealState idealState = Mockito.mock(IdealState.class);
+ ExternalView externalView = Mockito.mock(ExternalView.class);
+
+ // empty listener at beginning
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
+ mockPropertyStore);
+ assertEquals(segmentZkMetadataFetcher.getListeners().size(), 0);
+
+ // should allow register new listener
+
segmentZkMetadataFetcher.register(mock(SegmentZkMetadataFetchListener.class));
+ assertEquals(segmentZkMetadataFetcher.getListeners().size(), 1);
+
+ // should not allow register new listener once initialized
+ segmentZkMetadataFetcher.init(idealState, externalView,
Collections.singleton("foo"));
+ try {
+
segmentZkMetadataFetcher.register(mock(SegmentZkMetadataFetchListener.class));
+ fail();
+ } catch (RuntimeException rte) {
+ assertTrue(rte.getMessage().contains("has already been initialized"));
+ }
+
+ // should not allow duplicate init either
+ try {
+ segmentZkMetadataFetcher.init(idealState, externalView,
Collections.singleton("foo"));
+ fail();
+ } catch (RuntimeException rte) {
+ assertTrue(rte.getMessage().contains("has already been initialized"));
+ }
+ }
+
+ @Test
+ public void
testSegmentZkMetadataFetcherShouldNotPullZkWhenNoPrunerRegistered() {
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
Mockito.mock(ZkHelixPropertyStore.class);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
+ mockPropertyStore);
+ // NOTE: Ideal state and external view are not used in the current
implementation
+ IdealState idealState = Mockito.mock(IdealState.class);
+ ExternalView externalView = Mockito.mock(ExternalView.class);
+
+ assertEquals(segmentZkMetadataFetcher.getListeners().size(), 0);
+ segmentZkMetadataFetcher.init(idealState, externalView,
Collections.singleton("foo"));
+ Mockito.verify(mockPropertyStore, times(0)).get(any(), any(), anyInt(),
anyBoolean());
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
Collections.singleton("foo"));
+ Mockito.verify(mockPropertyStore, times(0)).get(any(), any(), anyInt(),
anyBoolean());
+ segmentZkMetadataFetcher.refreshSegment("foo");
+ Mockito.verify(mockPropertyStore, times(0)).get(any(), any(), anyInt(),
anyBoolean());
+ }
+
+ @Test
+ public void
testSegmentZkMetadataFetcherShouldPullZkOnlyOncePerSegmentWhenMultiplePrunersRegistered()
{
+ ZkHelixPropertyStore<ZNRecord> mockPropertyStore =
mock(ZkHelixPropertyStore.class);
+ when(mockPropertyStore.get(any(), any(), anyInt(),
anyBoolean())).thenAnswer(inv -> {
+ List<String> pathList = inv.getArgument(0);
+ List<ZNRecord> result = new ArrayList<>(pathList.size());
+ for (String path : pathList) {
+ String[] pathParts = path.split("/");
+ String segmentName = pathParts[pathParts.length - 1];
+ SegmentZKMetadata fakeSegmentZkMetadata = new
SegmentZKMetadata(segmentName);
+ result.add(fakeSegmentZkMetadata.toZNRecord());
+ }
+ return result;
+ });
+ SegmentPruner pruner1 = mock(SegmentPruner.class);
+ SegmentPruner pruner2 = mock(SegmentPruner.class);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
+ mockPropertyStore);
+ segmentZkMetadataFetcher.register(pruner1);
+ segmentZkMetadataFetcher.register(pruner2);
+ // NOTE: Ideal state and external view are not used in the current
implementation
+ IdealState idealState = mock(IdealState.class);
+ ExternalView externalView = mock(ExternalView.class);
+
+ assertEquals(segmentZkMetadataFetcher.getListeners().size(), 2);
+ // should call property store once for "foo" and "bar" as a batch
+ segmentZkMetadataFetcher.init(idealState, externalView,
ImmutableSet.of("foo", "bar"));
+ verify(mockPropertyStore, times(1)).get(argThat(new ListMatcher("foo",
"bar")), any(), anyInt(), anyBoolean());
+ verify(pruner1, times(1)).init(any(), any(), argThat(new
ListMatcher("foo", "bar")), any());
+ verify(pruner2, times(1)).init(any(), any(), argThat(new
ListMatcher("foo", "bar")), any());
+
+ // should call property store only once b/c "alice" was missing
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
ImmutableSet.of("bar", "alice"));
+ verify(mockPropertyStore, times(1)).get(argThat(new ListMatcher("alice")),
any(), anyInt(), anyBoolean());
+ verify(pruner1, times(1)).onAssignmentChange(any(), any(), any(),
argThat(new ListMatcher("alice")), any());
+ verify(pruner2, times(1)).onAssignmentChange(any(), any(), any(),
argThat(new ListMatcher("alice")), any());
+
+ // should call property store once more b/c "foo" was cleared when
onAssignmentChange called with "bar" and "alice"
+ segmentZkMetadataFetcher.refreshSegment("foo");
+ verify(mockPropertyStore, times(1)).get(endsWith("foo"), any(), anyInt());
+ verify(pruner1, times(1)).refreshSegment(eq("foo"), any());
+ verify(pruner2, times(1)).refreshSegment(eq("foo"), any());
+ clearInvocations(mockPropertyStore, pruner1, pruner2);
+
+ // update with all existing segments will call into property store and
pruner with empty list
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
ImmutableSet.of("bar", "alice"));
+ verify(mockPropertyStore, times(1)).get(argThat(new ListMatcher()), any(),
anyInt(), anyBoolean());
+ verify(pruner1, times(1)).onAssignmentChange(any(), any(), any(),
argThat(new ListMatcher()), any());
+ verify(pruner2, times(1)).onAssignmentChange(any(), any(), any(),
argThat(new ListMatcher()), any());
+
+ // calling refresh will still force pull from property store
+ segmentZkMetadataFetcher.refreshSegment("foo");
+ verify(mockPropertyStore, times(1)).get(endsWith("foo"), any(), anyInt());
+ verify(pruner1, times(1)).refreshSegment(eq("foo"), any());
+ verify(pruner2, times(1)).refreshSegment(eq("foo"), any());
+ }
+
+ private static class ListMatcher implements ArgumentMatcher<List<String>> {
+ private final List<String> _valueToMatch;
+
+ private ListMatcher(String... values) {
+ _valueToMatch = Arrays.asList(values);
+ }
+
+ @Override
+ public boolean matches(List<String> arg) {
+ if (arg.size() != _valueToMatch.size()) {
+ return false;
+ }
+ for (int i = 0; i < arg.size(); i++) {
+ if (!arg.get(i).endsWith(_valueToMatch.get(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index cc60739d36..feaad35169 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -35,6 +35,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
+import
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetcher;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -263,9 +264,12 @@ public class SegmentPrunerTest extends ControllerTest {
ExternalView externalView = Mockito.mock(ExternalView.class);
SinglePartitionColumnSegmentPruner singlePartitionColumnSegmentPruner =
- new SinglePartitionColumnSegmentPruner(OFFLINE_TABLE_NAME,
PARTITION_COLUMN_1, _propertyStore);
+ new SinglePartitionColumnSegmentPruner(OFFLINE_TABLE_NAME,
PARTITION_COLUMN_1);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
+ _propertyStore);
+ segmentZkMetadataFetcher.register(singlePartitionColumnSegmentPruner);
Set<String> onlineSegments = new HashSet<>();
- singlePartitionColumnSegmentPruner.init(idealState, externalView,
onlineSegments);
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
Collections.emptySet()),
Collections.emptySet());
assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
Collections.emptySet()),
@@ -289,7 +293,7 @@ public class SegmentPrunerTest extends ControllerTest {
new SegmentZKMetadata(segmentWithoutPartitionMetadata);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME,
segmentZKMetadataWithoutPartitionMetadata);
- singlePartitionColumnSegmentPruner.onAssignmentChange(idealState,
externalView, onlineSegments);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
new
HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
Collections.singletonList(segmentWithoutPartitionMetadata));
@@ -309,7 +313,7 @@ public class SegmentPrunerTest extends ControllerTest {
String segment1 = "segment1";
onlineSegments.add(segment1);
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment1, "Murmur", 4,
0);
- singlePartitionColumnSegmentPruner.onAssignmentChange(idealState,
externalView, onlineSegments);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
assertEquals(
singlePartitionColumnSegmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1))),
new HashSet<>(Arrays.asList(segment0, segment1)));
@@ -322,7 +326,7 @@ public class SegmentPrunerTest extends ControllerTest {
// Update partition metadata without refreshing should have no effect
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 4,
1);
- singlePartitionColumnSegmentPruner.onAssignmentChange(idealState,
externalView, onlineSegments);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
assertEquals(
singlePartitionColumnSegmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1))),
new HashSet<>(Arrays.asList(segment0, segment1)));
@@ -334,7 +338,7 @@ public class SegmentPrunerTest extends ControllerTest {
new HashSet<>(Collections.singletonList(segment1)));
// Refresh the changed segment should update the segment pruner
- singlePartitionColumnSegmentPruner.refreshSegment(segment0);
+ segmentZkMetadataFetcher.refreshSegment(segment0);
assertEquals(
singlePartitionColumnSegmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1))),
new HashSet<>(Arrays.asList(segment0, segment1)));
@@ -348,8 +352,10 @@ public class SegmentPrunerTest extends ControllerTest {
// Multi-column partitioned segment.
MultiPartitionColumnsSegmentPruner multiPartitionColumnsSegmentPruner =
new MultiPartitionColumnsSegmentPruner(OFFLINE_TABLE_NAME,
- Stream.of(PARTITION_COLUMN_1,
PARTITION_COLUMN_2).collect(Collectors.toSet()), _propertyStore);
- multiPartitionColumnsSegmentPruner.init(idealState, externalView,
onlineSegments);
+ Stream.of(PARTITION_COLUMN_1,
PARTITION_COLUMN_2).collect(Collectors.toSet()));
+ segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
+ segmentZkMetadataFetcher.register(multiPartitionColumnsSegmentPruner);
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1,
Collections.emptySet()),
Collections.emptySet());
assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2,
Collections.emptySet()),
@@ -367,10 +373,10 @@ public class SegmentPrunerTest extends ControllerTest {
Map<String, String> partitionColumn2FunctionConfig = new HashMap<>();
partitionColumn2FunctionConfig.put("columnValues", "xyz|abc");
partitionColumn2FunctionConfig.put("columnValuesDelimiter", "|");
- columnPartitionMetadataMap.put(PARTITION_COLUMN_2,
- new ColumnPartitionMetadata("BoundedColumnValue", 3,
Collections.singleton(1), partitionColumn2FunctionConfig));
+ columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new
ColumnPartitionMetadata(
+ "BoundedColumnValue", 3, Collections.singleton(1),
partitionColumn2FunctionConfig));
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment2,
columnPartitionMetadataMap);
- multiPartitionColumnsSegmentPruner.onAssignmentChange(idealState,
externalView, onlineSegments);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
assertEquals(
multiPartitionColumnsSegmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1))),
new HashSet<>(Arrays.asList(segment0, segment1)));
@@ -399,10 +405,13 @@ public class SegmentPrunerTest extends ControllerTest {
TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME,
TableType.REALTIME);
setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
-
- TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig,
_propertyStore);
+ TimeSegmentPruner segmentPruner =
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig,
+ _propertyStore);
Set<String> onlineSegments = new HashSet<>();
- segmentPruner.init(idealState, externalView, onlineSegments);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
+ _propertyStore);
+ segmentZkMetadataFetcher.register(segmentPruner);
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()),
Collections.emptySet());
assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()),
Collections.emptySet());
assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()),
Collections.emptySet());
@@ -413,10 +422,12 @@ public class SegmentPrunerTest extends ControllerTest {
// Initialize with non-empty onlineSegments
// Segments without metadata (not updated yet) should not be pruned
- segmentPruner = new TimeSegmentPruner(tableConfig, _propertyStore);
+ segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig,
_propertyStore);
+ segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
+ segmentZkMetadataFetcher.register(segmentPruner);
String newSegment = "newSegment";
onlineSegments.add(newSegment);
- segmentPruner.init(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1,
Collections.singleton(newSegment)),
Collections.singletonList(newSegment));
assertEquals(segmentPruner.prune(brokerRequest2,
Collections.singleton(newSegment)),
@@ -440,7 +451,7 @@ public class SegmentPrunerTest extends ControllerTest {
segmentZKMetadataWithoutTimeRangeMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
REALTIME_TABLE_NAME,
segmentZKMetadataWithoutTimeRangeMetadata);
- segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
assertEquals(
segmentPruner.prune(brokerRequest1, new
HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
Collections.singletonList(segmentWithoutTimeRangeMetadata));
@@ -476,7 +487,7 @@ public class SegmentPrunerTest extends ControllerTest {
onlineSegments.add(segment2);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65,
TimeUnit.DAYS);
- segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
@@ -510,7 +521,7 @@ public class SegmentPrunerTest extends ControllerTest {
Collections.emptySet());
// Refresh the changed segment should update the segment pruner
- segmentPruner.refreshSegment(segment2);
+ segmentZkMetadataFetcher.refreshSegment(segment2);
assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
@@ -541,7 +552,10 @@ public class SegmentPrunerTest extends ControllerTest {
TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME,
TableType.REALTIME);
setSchemaDateTimeFieldSpecSDF(RAW_TABLE_NAME, SDF_PATTERN);
- TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig,
_propertyStore);
+ TimeSegmentPruner segmentPruner =
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
+ _propertyStore);
+ segmentZkMetadataFetcher.register(segmentPruner);
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore,
RAW_TABLE_NAME);
DateTimeFormatSpec dateTimeFormatSpec =
schema.getSpecForTimeColumn(TIME_COLUMN).getFormatSpec();
@@ -561,7 +575,7 @@ public class SegmentPrunerTest extends ControllerTest {
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2,
dateTimeFormatSpec.fromFormatToMillis("20200401"),
dateTimeFormatSpec.fromFormatToMillis("20200430"),
TimeUnit.MILLISECONDS);
- segmentPruner.init(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments),
Collections.singleton(segment0));
assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new
HashSet<>(Arrays.asList(segment0, segment1)));
assertEquals(segmentPruner.prune(brokerRequest3, onlineSegments),
Collections.singleton(segment1));
@@ -581,7 +595,10 @@ public class SegmentPrunerTest extends ControllerTest {
TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME,
TableType.REALTIME);
setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
- TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig,
_propertyStore);
+ TimeSegmentPruner segmentPruner =
SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
+ _propertyStore);
+ segmentZkMetadataFetcher.register(segmentPruner);
Set<String> onlineSegments = new HashSet<>();
String segment0 = "segment0";
onlineSegments.add(segment0);
@@ -592,7 +609,7 @@ public class SegmentPrunerTest extends ControllerTest {
String segment2 = "segment2";
onlineSegments.add(segment2);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65,
TimeUnit.DAYS);
- segmentPruner.init(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new
HashSet<>(Arrays.asList(segment0, segment2)));
assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new
HashSet<>(Arrays.asList(segment0, segment1)));
@@ -610,7 +627,10 @@ public class SegmentPrunerTest extends ControllerTest {
TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME,
TableType.REALTIME);
// init with list of segments
- EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig,
_propertyStore);
+ EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
+ _propertyStore);
+ segmentZkMetadataFetcher.register(segmentPruner);
Set<String> onlineSegments = new HashSet<>();
String segment0 = "segment0";
onlineSegments.add(segment0);
@@ -618,7 +638,7 @@ public class SegmentPrunerTest extends ControllerTest {
String segment1 = "segment1";
onlineSegments.add(segment1);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0);
- segmentPruner.init(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1))),
new HashSet<>(Collections.singletonList(segment0)));
assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1))),
@@ -627,9 +647,11 @@ public class SegmentPrunerTest extends ControllerTest {
new HashSet<>(Collections.singletonList(segment0)));
// init with empty list of segments
- segmentPruner = new EmptySegmentPruner(tableConfig, _propertyStore);
+ segmentPruner = new EmptySegmentPruner(tableConfig);
+ segmentZkMetadataFetcher = new
SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
+ segmentZkMetadataFetcher.register(segmentPruner);
onlineSegments.clear();
- segmentPruner.init(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()),
Collections.emptySet());
assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()),
Collections.emptySet());
assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()),
Collections.emptySet());
@@ -637,7 +659,7 @@ public class SegmentPrunerTest extends ControllerTest {
// Segments without metadata (not updated yet) should not be pruned
String newSegment = "newSegment";
onlineSegments.add(newSegment);
- segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1,
Collections.singleton(newSegment)),
Collections.singleton(newSegment));
assertEquals(segmentPruner.prune(brokerRequest2,
Collections.singleton(newSegment)),
@@ -654,7 +676,7 @@ public class SegmentPrunerTest extends ControllerTest {
segmentZKMetadataWithoutTotalDocsMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
REALTIME_TABLE_NAME,
segmentZKMetadataWithoutTotalDocsMetadata);
- segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1,
Collections.singleton(segmentWithoutTotalDocsMetadata)),
Collections.singleton(segmentWithoutTotalDocsMetadata));
assertEquals(segmentPruner.prune(brokerRequest2,
Collections.singleton(segmentWithoutTotalDocsMetadata)),
@@ -667,7 +689,7 @@ public class SegmentPrunerTest extends ControllerTest {
String segmentWithNegativeTotalDocsMetadata =
"segmentWithNegativeTotalDocsMetadata";
onlineSegments.add(segmentWithNegativeTotalDocsMetadata);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME,
segmentWithNegativeTotalDocsMetadata, -1);
- segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1,
Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
Collections.singleton(segmentWithNegativeTotalDocsMetadata));
assertEquals(segmentPruner.prune(brokerRequest2,
Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
@@ -685,7 +707,7 @@ public class SegmentPrunerTest extends ControllerTest {
onlineSegments.add(segment2);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, -1);
- segmentPruner.onAssignmentChange(idealState, externalView, onlineSegments);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
new HashSet<>(Arrays.asList(segment0, segment2)));
assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
@@ -704,7 +726,7 @@ public class SegmentPrunerTest extends ControllerTest {
new HashSet<>(Arrays.asList(segment0, segment2)));
// Refresh the changed segment should update the segment pruner
- segmentPruner.refreshSegment(segment2);
+ segmentZkMetadataFetcher.refreshSegment(segment2);
assertEquals(segmentPruner.prune(brokerRequest1, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
new HashSet<>(Collections.singletonList(segment0)));
assertEquals(segmentPruner.prune(brokerRequest2, new
HashSet<>(Arrays.asList(segment0, segment1, segment2))),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]