This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cf460aa8b3 Fix the bug of using push time to identify new created
segment (#11599)
cf460aa8b3 is described below
commit cf460aa8b39019910324fa28e77f8911cf2e06c6
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Sep 14 19:23:57 2023 -0700
Fix the bug of using push time to identify new created segment (#11599)
---
.../instanceselector/BaseInstanceSelector.java | 85 +++++++++++-----------
.../routing/instanceselector/InstanceSelector.java | 4 +-
.../routing/instanceselector/NewSegmentState.java | 14 ++--
.../StrictReplicaGroupInstanceSelector.java | 8 +-
.../SegmentPartitionMetadataManager.java | 76 ++++++++++---------
.../instanceselector/InstanceSelectorTest.java | 53 ++++++++------
.../SegmentPartitionMetadataManagerTest.java | 8 +-
.../common/metadata/segment/SegmentZKMetadata.java | 12 +++
.../apache/pinot/common/utils/SegmentUtils.java | 16 ++++
.../pinot/common/utils/SegmentUtilsTest.java | 38 ++++++++++
.../apache/pinot/query/routing/WorkerManager.java | 3 -
11 files changed, 197 insertions(+), 120 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 3a57530e08..972142663e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -43,6 +43,7 @@ import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.SegmentUtils;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,19 +61,19 @@ import org.slf4j.LoggerFactory;
* selection. When it is selected, we don't serve the new segment.
* <p>
* Definition of new segment:
- * 1) Segment pushed more than 5 minutes ago.
- * - If we first see a segment via initialization, we look up segment push
time from zookeeper.
+ * 1) Segment created more than 5 minutes ago.
+ * - If we first see a segment via initialization, we look up segment creation
time from zookeeper.
* - If we first see a segment via onAssignmentChange initialization, we use
the calling time of onAssignmentChange
* as approximation.
* 2) We retire new segment as old when:
- * - The push time is more than 5 minutes ago
+ * - The creation time is more than 5 minutes ago
* - Any instance for new segment is in ERROR state
* - External view for segment converges with ideal state
*
* Note that this implementation means:
* 1) Inconsistent selection of new segments across queries (some queries will
serve new segments and others won't).
* 2) When there is no state update from helix, new segments won't be retired
because of the time passing (those with
- * push time more than 5 minutes ago).
+ * creation time more than 5 minutes ago).
* TODO: refresh new/old segment state where there is no update from helix for
long time.
*/
abstract class BaseInstanceSelector implements InstanceSelector {
@@ -109,8 +110,9 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
public void init(Set<String> enabledInstances, IdealState idealState,
ExternalView externalView,
Set<String> onlineSegments) {
_enabledInstances = enabledInstances;
- Map<String, Long> newSegmentPushTimeMap =
getNewSegmentPushTimeMapFromZK(idealState, externalView, onlineSegments);
- updateSegmentMaps(idealState, externalView, onlineSegments,
newSegmentPushTimeMap);
+ Map<String, Long> newSegmentCreationTimeMap =
+ getNewSegmentCreationTimeMapFromZK(idealState, externalView,
onlineSegments);
+ updateSegmentMaps(idealState, externalView, onlineSegments,
newSegmentCreationTimeMap);
refreshSegmentStates();
}
@@ -122,9 +124,9 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
}
/**
- * Returns a map from new segment to their push time based on the ZK
metadata.
+ * Returns a map from new segment to their creation time based on the ZK
metadata.
*/
- Map<String, Long> getNewSegmentPushTimeMapFromZK(IdealState idealState,
ExternalView externalView,
+ Map<String, Long> getNewSegmentCreationTimeMapFromZK(IdealState idealState,
ExternalView externalView,
Set<String> onlineSegments) {
List<String> potentialNewSegments = new ArrayList<>();
Map<String, Map<String, String>> idealStateAssignment =
idealState.getRecord().getMapFields();
@@ -136,9 +138,8 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
}
}
- // Use push time in ZK metadata to determine whether the potential new
segment is newly pushed
- Map<String, Long> newSegmentPushTimeMap = new HashMap<>();
- long nowMillis = _clock.millis();
+ Map<String, Long> newSegmentCreationTimeMap = new HashMap<>();
+ long currentTimeMs = _clock.millis();
String segmentZKMetadataPathPrefix =
ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) +
"/";
List<String> segmentZKMetadataPaths = new
ArrayList<>(potentialNewSegments.size());
@@ -151,14 +152,14 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
continue;
}
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(record);
- long pushTimeMillis = segmentZKMetadata.getPushTime();
- if (InstanceSelector.isNewSegment(pushTimeMillis, nowMillis)) {
- newSegmentPushTimeMap.put(segmentZKMetadata.getSegmentName(),
pushTimeMillis);
+ long creationTimeMs =
SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata);
+ if (InstanceSelector.isNewSegment(creationTimeMs, currentTimeMs)) {
+ newSegmentCreationTimeMap.put(segmentZKMetadata.getSegmentName(),
creationTimeMs);
}
}
LOGGER.info("Got {} new segments: {} for table: {} by reading ZK metadata,
current time: {}",
- newSegmentPushTimeMap.size(), newSegmentPushTimeMap,
_tableNameWithType, nowMillis);
- return newSegmentPushTimeMap;
+ newSegmentCreationTimeMap.size(), newSegmentCreationTimeMap,
_tableNameWithType, currentTimeMs);
+ return newSegmentCreationTimeMap;
}
/**
@@ -220,21 +221,21 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
* ONLINE/CONSUMING instances in the ideal state and pre-selected by the
{@link SegmentPreSelector}) and new segments.
* After this update:
* - Old segments' online instances should be tracked in
_oldSegmentCandidatesMap
- * - New segments' state (push time and candidate instances) should be
tracked in _newSegmentStateMap
+ * - New segments' state (creation time and candidate instances) should be
tracked in _newSegmentStateMap
*/
void updateSegmentMaps(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments,
- Map<String, Long> newSegmentPushTimeMap) {
+ Map<String, Long> newSegmentCreationTimeMap) {
_oldSegmentCandidatesMap.clear();
- _newSegmentStateMap = new
HashMap<>(HashUtil.getHashMapCapacity(newSegmentPushTimeMap.size()));
+ _newSegmentStateMap = new
HashMap<>(HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size()));
Map<String, Map<String, String>> idealStateAssignment =
idealState.getRecord().getMapFields();
Map<String, Map<String, String>> externalViewAssignment =
externalView.getRecord().getMapFields();
for (String segment : onlineSegments) {
Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
- Long newSegmentPushTimeMillis = newSegmentPushTimeMap.get(segment);
+ Long newSegmentCreationTimeMs = newSegmentCreationTimeMap.get(segment);
Map<String, String> externalViewInstanceStateMap =
externalViewAssignment.get(segment);
if (externalViewInstanceStateMap == null) {
- if (newSegmentPushTimeMillis != null) {
+ if (newSegmentCreationTimeMs != null) {
// New segment
List<SegmentInstanceCandidate> candidates = new
ArrayList<>(idealStateInstanceStateMap.size());
for (Map.Entry<String, String> entry :
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
@@ -242,14 +243,14 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
candidates.add(new SegmentInstanceCandidate(entry.getKey(),
false));
}
}
- _newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentPushTimeMillis, candidates));
+ _newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentCreationTimeMs, candidates));
} else {
// Old segment
_oldSegmentCandidatesMap.put(segment, Collections.emptyList());
}
} else {
TreeSet<String> onlineInstances =
getOnlineInstances(idealStateInstanceStateMap, externalViewInstanceStateMap);
- if (newSegmentPushTimeMillis != null) {
+ if (newSegmentCreationTimeMs != null) {
// New segment
List<SegmentInstanceCandidate> candidates = new
ArrayList<>(idealStateInstanceStateMap.size());
for (Map.Entry<String, String> entry :
convertToSortedMap(idealStateInstanceStateMap).entrySet()) {
@@ -258,7 +259,7 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
candidates.add(new SegmentInstanceCandidate(instance,
onlineInstances.contains(instance)));
}
}
- _newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentPushTimeMillis, candidates));
+ _newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentCreationTimeMs, candidates));
} else {
// Old segment
List<SegmentInstanceCandidate> candidates = new
ArrayList<>(onlineInstances.size());
@@ -358,44 +359,44 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
*/
@Override
public void onAssignmentChange(IdealState idealState, ExternalView
externalView, Set<String> onlineSegments) {
- Map<String, Long> newSegmentPushTimeMap =
- getNewSegmentPushTimeMapFromExistingStates(idealState, externalView,
onlineSegments);
- updateSegmentMaps(idealState, externalView, onlineSegments,
newSegmentPushTimeMap);
+ Map<String, Long> newSegmentCreationTimeMap =
+ getNewSegmentCreationTimeMapFromExistingStates(idealState,
externalView, onlineSegments);
+ updateSegmentMaps(idealState, externalView, onlineSegments,
newSegmentCreationTimeMap);
refreshSegmentStates();
}
/**
- * Returns a map from new segment to their push time based on the existing
in-memory states.
+ * Returns a map from new segment to their creation time based on the
existing in-memory states.
*/
- Map<String, Long> getNewSegmentPushTimeMapFromExistingStates(IdealState
idealState, ExternalView externalView,
+ Map<String, Long> getNewSegmentCreationTimeMapFromExistingStates(IdealState
idealState, ExternalView externalView,
Set<String> onlineSegments) {
- Map<String, Long> newSegmentPushTimeMap = new HashMap<>();
- long nowMillis = _clock.millis();
+ Map<String, Long> newSegmentCreationTimeMap = new HashMap<>();
+ long currentTimeMs = _clock.millis();
Map<String, Map<String, String>> idealStateAssignment =
idealState.getRecord().getMapFields();
Map<String, Map<String, String>> externalViewAssignment =
externalView.getRecord().getMapFields();
for (String segment : onlineSegments) {
NewSegmentState newSegmentState = _newSegmentStateMap.get(segment);
- long pushTimeMillis = 0;
+ long creationTimeMs = 0;
if (newSegmentState != null) {
- // It was a new segment before, check the push time and segment state
to see if it is still a new segment
- if (InstanceSelector.isNewSegment(newSegmentState.getPushTimeMillis(),
nowMillis)) {
- pushTimeMillis = newSegmentState.getPushTimeMillis();
+ // It was a new segment before, check the creation time and segment
state to see if it is still a new segment
+ if (InstanceSelector.isNewSegment(newSegmentState.getCreationTimeMs(),
currentTimeMs)) {
+ creationTimeMs = newSegmentState.getCreationTimeMs();
}
} else if (!_oldSegmentCandidatesMap.containsKey(segment)) {
- // This is the first time we see this segment, use the current time as
the push time
- pushTimeMillis = nowMillis;
+ // This is the first time we see this segment, use the current time as
the creation time
+ creationTimeMs = currentTimeMs;
}
- // For recently pushed segment, check if it is qualified as new segment
- if (pushTimeMillis > 0) {
+ // For recently created segment, check if it is qualified as new segment
+ if (creationTimeMs > 0) {
assert idealStateAssignment.containsKey(segment);
if (isPotentialNewSegment(idealStateAssignment.get(segment),
externalViewAssignment.get(segment))) {
- newSegmentPushTimeMap.put(segment, pushTimeMillis);
+ newSegmentCreationTimeMap.put(segment, creationTimeMs);
}
}
}
LOGGER.info("Got {} new segments: {} for table: {} by processing existing
states, current time: {}",
- newSegmentPushTimeMap.size(), newSegmentPushTimeMap,
_tableNameWithType, nowMillis);
- return newSegmentPushTimeMap;
+ newSegmentCreationTimeMap.size(), newSegmentCreationTimeMap,
_tableNameWithType, currentTimeMs);
+ return newSegmentCreationTimeMap;
}
@Override
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index 4086babb51..9ffe830229 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -34,8 +34,8 @@ import org.apache.pinot.common.request.BrokerRequest;
public interface InstanceSelector {
long NEW_SEGMENT_EXPIRATION_MILLIS = TimeUnit.MINUTES.toMillis(5);
- static boolean isNewSegment(long pushMillis, long nowMillis) {
- return nowMillis - pushMillis <= NEW_SEGMENT_EXPIRATION_MILLIS;
+ static boolean isNewSegment(long creationTimeMs, long currentTimeMs) {
+ return creationTimeMs > 0 && currentTimeMs - creationTimeMs <=
NEW_SEGMENT_EXPIRATION_MILLIS;
}
/**
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/NewSegmentState.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/NewSegmentState.java
index c6c706e5a1..6cc186e7b8 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/NewSegmentState.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/NewSegmentState.java
@@ -27,22 +27,22 @@ import javax.annotation.concurrent.Immutable;
*/
@Immutable
public class NewSegmentState {
- // Segment push time. This could be
+ // Segment creation time. This could be
// 1) From ZK if we first see this segment via init call.
- // 2) Use wall time, if first see this segment from onAssignmentChange call.
- private final long _pushTimeMillis;
+ // 2) Use wall time if we first see this segment from onAssignmentChange
call.
+ private final long _creationTimeMs;
// List of SegmentInstanceCandidate: which contains instance name and online
flags.
// The candidates have to be in instance sorted order.
private final List<SegmentInstanceCandidate> _candidates;
- public NewSegmentState(long pushTimeMillis, List<SegmentInstanceCandidate>
candidates) {
- _pushTimeMillis = pushTimeMillis;
+ public NewSegmentState(long creationTimeMs, List<SegmentInstanceCandidate>
candidates) {
+ _creationTimeMs = creationTimeMs;
_candidates = candidates;
}
- public long getPushTimeMillis() {
- return _pushTimeMillis;
+ public long getCreationTimeMs() {
+ return _creationTimeMs;
}
public List<SegmentInstanceCandidate> getCandidates() {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index ddb83ce282..8206d49452 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -91,9 +91,9 @@ public class StrictReplicaGroupInstanceSelector extends
ReplicaGroupInstanceSele
*/
@Override
void updateSegmentMaps(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments,
- Map<String, Long> newSegmentPushTimeMap) {
+ Map<String, Long> newSegmentCreationTimeMap) {
_oldSegmentCandidatesMap.clear();
- int newSegmentMapCapacity =
HashUtil.getHashMapCapacity(newSegmentPushTimeMap.size());
+ int newSegmentMapCapacity =
HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size());
_newSegmentStateMap = new HashMap<>(newSegmentMapCapacity);
Map<String, Map<String, String>> idealStateAssignment =
idealState.getRecord().getMapFields();
@@ -113,7 +113,7 @@ public class StrictReplicaGroupInstanceSelector extends
ReplicaGroupInstanceSele
} else {
onlineInstances = getOnlineInstances(idealStateInstanceStateMap,
externalViewInstanceStateMap);
}
- if (newSegmentPushTimeMap.containsKey(segment)) {
+ if (newSegmentCreationTimeMap.containsKey(segment)) {
newSegmentToOnlineInstancesMap.put(segment, onlineInstances);
} else {
oldSegmentToOnlineInstancesMap.put(segment, onlineInstances);
@@ -170,7 +170,7 @@ public class StrictReplicaGroupInstanceSelector extends
ReplicaGroupInstanceSele
candidates.add(new SegmentInstanceCandidate(instance,
onlineInstances.contains(instance)));
}
}
- _newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentPushTimeMap.get(segment), candidates));
+ _newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates));
}
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
index 0ca3c3d317..3623955591 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java
@@ -31,10 +31,11 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.routing.instanceselector.InstanceSelector;
import
org.apache.pinot.broker.routing.segmentmetadata.SegmentZkMetadataFetchListener;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.core.routing.TablePartitionInfo;
import org.apache.pinot.core.routing.TablePartitionInfo.PartitionInfo;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.spi.utils.CommonConstants;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory;
public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentPartitionMetadataManager.class);
private static final int INVALID_PARTITION_ID = -1;
- private static final long INVALID_PUSH_TIME_MS = -1L;
+ private static final long INVALID_CREATION_TIME_MS = -1L;
private final String _tableNameWithType;
@@ -81,7 +82,7 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
for (int i = 0; i < numSegments; i++) {
String segment = onlineSegments.get(i);
ZNRecord znRecord = znRecords.get(i);
- SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(segment,
znRecord), getPushTimeMs(znRecord),
+ SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(segment,
znRecord), getCreationTimeMs(znRecord),
getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
}
@@ -111,17 +112,11 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
return partitions.iterator().next();
}
- private static long getPushTimeMs(@Nullable ZNRecord znRecord) {
+ private static long getCreationTimeMs(@Nullable ZNRecord znRecord) {
if (znRecord == null) {
- return INVALID_PUSH_TIME_MS;
+ return INVALID_CREATION_TIME_MS;
}
- String pushTimeString =
znRecord.getSimpleField(CommonConstants.Segment.PUSH_TIME);
- // Handle legacy push time key
- if (pushTimeString == null) {
- pushTimeString =
znRecord.getSimpleField(CommonConstants.Segment.Offline.PUSH_TIME);
- }
- // Return INVALID_PUSH_TIME_MS if unavailable for backward compatibility
- return pushTimeString != null ? Long.parseLong(pushTimeString) :
INVALID_PUSH_TIME_MS;
+ return SegmentUtils.getSegmentCreationTimeMs(new
SegmentZKMetadata(znRecord));
}
private static List<String> getOnlineServers(ExternalView externalView,
String segment) {
@@ -153,7 +148,7 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
continue;
}
// Process new segments in the end
- if (InstanceSelector.isNewSegment(segmentInfo._pushTimeMs,
currentTimeMs)) {
+ if (InstanceSelector.isNewSegment(segmentInfo._creationTimeMs,
currentTimeMs)) {
newSegmentInfoEntries.add(entry);
continue;
}
@@ -165,18 +160,26 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
segments.add(segment);
partitionInfo = new PartitionInfo(fullyReplicatedServers, segments);
partitionInfoMap[partitionId] = partitionInfo;
+ if (onlineServers.isEmpty()) {
+ LOGGER.warn("Found segment: {} without any available replica in
table: {}, partition: {}", segment,
+ _tableNameWithType, partitionId);
+ }
} else {
- partitionInfo._fullyReplicatedServers.retainAll(onlineServers);
+ if (partitionInfo._fullyReplicatedServers.retainAll(onlineServers)) {
+ LOGGER.warn("Found segment: {} with online servers: {} that reduces
the fully replicated servers to: {} "
+ + "in table: {}, partition: {}", segment, onlineServers,
partitionInfo._fullyReplicatedServers,
+ _tableNameWithType, partitionId);
+ }
partitionInfo._segments.add(segment);
}
}
if (!segmentsWithInvalidPartition.isEmpty()) {
int numSegmentsWithInvalidPartition =
segmentsWithInvalidPartition.size();
if (numSegmentsWithInvalidPartition <= 10) {
- LOGGER.warn("Found {} segments: {} with invalid partition from table:
{}", numSegmentsWithInvalidPartition,
+ LOGGER.warn("Found {} segments: {} with invalid partition in table:
{}", numSegmentsWithInvalidPartition,
segmentsWithInvalidPartition, _tableNameWithType);
} else {
- LOGGER.warn("Found {} segments: {}... with invalid partition from
table: {}", numSegmentsWithInvalidPartition,
+ LOGGER.warn("Found {} segments: {}... with invalid partition in table:
{}", numSegmentsWithInvalidPartition,
segmentsWithInvalidPartition.subList(0, 10), _tableNameWithType);
}
}
@@ -190,17 +193,22 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
List<String> onlineServers = segmentInfo._onlineServers;
PartitionInfo partitionInfo = partitionInfoMap[partitionId];
if (partitionInfo == null) {
- // If the new segment is the first segment of a partition, treat it
as regular segment
- Set<String> fullyReplicatedServers = new HashSet<>(onlineServers);
- List<String> segments = new ArrayList<>();
- segments.add(segment);
- partitionInfo = new PartitionInfo(fullyReplicatedServers, segments);
- partitionInfoMap[partitionId] = partitionInfo;
+ // If the new segment is the first segment of a partition, treat it
as regular segment if it has available
+ // replicas
+ if (!onlineServers.isEmpty()) {
+ Set<String> fullyReplicatedServers = new HashSet<>(onlineServers);
+ List<String> segments = new ArrayList<>();
+ segments.add(segment);
+ partitionInfo = new PartitionInfo(fullyReplicatedServers,
segments);
+ partitionInfoMap[partitionId] = partitionInfo;
+ } else {
+ excludedNewSegments.add(segment);
+ }
} else {
// If the new segment is not the first segment of a partition, add
it only if it won't reduce the fully
- // replicated servers. It is common that a new segment (newly
pushed, or a new consuming segment) doesn't have
- // all the replicas available yet, and we want to exclude it from
the partition info until all the replicas
- // are available.
+ // replicated servers. It is common that a new created segment
(newly pushed, or a new consuming segment)
+ // doesn't have all the replicas available yet, and we want to
exclude it from the partition info until all
+ // the replicas are available.
//noinspection SlowListContainsAll
if
(onlineServers.containsAll(partitionInfo._fullyReplicatedServers)) {
partitionInfo._segments.add(segment);
@@ -212,10 +220,10 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
if (!excludedNewSegments.isEmpty()) {
int numExcludedNewSegments = excludedNewSegments.size();
if (numExcludedNewSegments <= 10) {
- LOGGER.info("Excluded {} new segments: {} without all replicas
available from table: {}",
+ LOGGER.info("Excluded {} new segments: {} without all replicas
available in table: {}",
numExcludedNewSegments, excludedNewSegments, _tableNameWithType);
} else {
- LOGGER.info("Excluded {} new segments: {}... without all replicas
available from table: {}",
+ LOGGER.info("Excluded {} new segments: {}... without all replicas
available in table: {}",
numExcludedNewSegments, excludedNewSegments.subList(0, 10),
_tableNameWithType);
}
}
@@ -233,7 +241,7 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
for (int i = 0; i < numSegments; i++) {
String segment = pulledSegments.get(i);
ZNRecord znRecord = znRecords.get(i);
- SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(segment,
znRecord), getPushTimeMs(znRecord),
+ SegmentInfo segmentInfo = new SegmentInfo(getPartitionId(segment,
znRecord), getCreationTimeMs(znRecord),
getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
}
@@ -245,7 +253,7 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
LOGGER.error("Failed to find segment info for segment: {} in table: {}
while handling assignment change",
segment, _tableNameWithType);
segmentInfo =
- new SegmentInfo(INVALID_PARTITION_ID, INVALID_PUSH_TIME_MS,
getOnlineServers(externalView, segment));
+ new SegmentInfo(INVALID_PARTITION_ID, INVALID_CREATION_TIME_MS,
getOnlineServers(externalView, segment));
_segmentInfoMap.put(segment, segmentInfo);
} else {
segmentInfo._onlineServers = getOnlineServers(externalView, segment);
@@ -258,7 +266,7 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
@Override
public synchronized void refreshSegment(String segment, @Nullable ZNRecord
znRecord) {
int partitionId = getPartitionId(segment, znRecord);
- long pushTimeMs = getPushTimeMs(znRecord);
+ long pushTimeMs = getCreationTimeMs(znRecord);
SegmentInfo segmentInfo = _segmentInfoMap.get(segment);
if (segmentInfo == null) {
// NOTE: This should not happen, but we still handle it gracefully by
adding an invalid SegmentInfo
@@ -268,7 +276,7 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
_segmentInfoMap.put(segment, segmentInfo);
} else {
segmentInfo._partitionId = partitionId;
- segmentInfo._pushTimeMs = pushTimeMs;
+ segmentInfo._creationTimeMs = pushTimeMs;
}
computeTablePartitionInfo();
}
@@ -279,12 +287,12 @@ public class SegmentPartitionMetadataManager implements
SegmentZkMetadataFetchLi
private static class SegmentInfo {
int _partitionId;
- long _pushTimeMs;
+ long _creationTimeMs;
List<String> _onlineServers;
- SegmentInfo(int partitionId, long pushTimeMs, List<String> onlineServers) {
+ SegmentInfo(int partitionId, long creationTimeMs, List<String>
onlineServers) {
_partitionId = partitionId;
- _pushTimeMs = pushTimeMs;
+ _creationTimeMs = creationTimeMs;
_onlineServers = onlineServers;
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index a9ff0f1819..fd2568e183 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -108,16 +108,17 @@ public class InstanceSelectorTest {
Arrays.asList("segment0", "segment1", "segment2", "segment3",
"segment4", "segment5", "segment6", "segment7",
"segment8", "segment9", "segment10", "segment11");
- private void createSegments(List<Pair<String, Long>> segmentPushMillis) {
+ private void createSegments(List<Pair<String, Long>>
segmentCreationTimeMsPairs) {
List<String> segmentZKMetadataPaths = new ArrayList<>();
List<ZNRecord> zkRecords = new ArrayList<>();
- for (Pair<String, Long> segment : segmentPushMillis) {
- SegmentZKMetadata offlineSegmentZKMetadata0 = new
SegmentZKMetadata(segment.getLeft());
- offlineSegmentZKMetadata0.setPushTime(segment.getRight());
+ for (Pair<String, Long> segmentCreationTimeMsPair :
segmentCreationTimeMsPairs) {
+ String segment = segmentCreationTimeMsPair.getLeft();
+ long creationTimeMs = segmentCreationTimeMsPair.getRight();
+ SegmentZKMetadata offlineSegmentZKMetadata0 = new
SegmentZKMetadata(segment);
+ offlineSegmentZKMetadata0.setCreationTime(creationTimeMs);
offlineSegmentZKMetadata0.setTimeUnit(TimeUnit.MILLISECONDS);
ZNRecord record = offlineSegmentZKMetadata0.toZNRecord();
- segmentZKMetadataPaths.add(
- ZKMetadataProvider.constructPropertyStorePathForSegment(TABLE_NAME,
segment.getLeft()));
+
segmentZKMetadataPaths.add(ZKMetadataProvider.constructPropertyStorePathForSegment(TABLE_NAME,
segment));
zkRecords.add(record);
}
when(_propertyStore.get(eq(segmentZKMetadataPaths), any(), anyInt(),
anyBoolean())).thenReturn(zkRecords);
@@ -1377,8 +1378,9 @@ public class InstanceSelectorTest {
public void testNewSegmentFromZKMetadataSelection(String selectorType) {
String oldSeg = "segment0";
String newSeg = "segment1";
- List<Pair<String, Long>> segmentPushTime =
ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100));
- createSegments(segmentPushTime);
+ List<Pair<String, Long>> segmentCreationTimeMsPairs =
+ ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100));
+ createSegments(segmentCreationTimeMsPairs);
Set<String> onlineSegments = ImmutableSet.of(oldSeg, newSeg);
// Set up instances
@@ -1484,9 +1486,10 @@ public class InstanceSelectorTest {
// Set segment0 as new segment
String newSeg = "segment0";
String oldSeg = "segment1";
- List<Pair<String, Long>> segmentPushTime =
ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100),
- Pair.of(oldSeg, _mutableClock.millis() - NEW_SEGMENT_EXPIRATION_MILLIS
- 100));
- createSegments(segmentPushTime);
+ List<Pair<String, Long>> segmentCreationTimeMsPairs =
+ ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100),
+ Pair.of(oldSeg, _mutableClock.millis() -
NEW_SEGMENT_EXPIRATION_MILLIS - 100));
+ createSegments(segmentCreationTimeMsPairs);
Set<String> onlineSegments = ImmutableSet.of(newSeg, oldSeg);
// Set up instances
@@ -1541,8 +1544,9 @@ public class InstanceSelectorTest {
String oldSeg = "segment0";
// Set segment1 as new segment
String newSeg = "segment1";
- List<Pair<String, Long>> segmentPushTime =
ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100));
- createSegments(segmentPushTime);
+ List<Pair<String, Long>> segmentCreationTimeMsPairs =
+ ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100));
+ createSegments(segmentCreationTimeMsPairs);
Set<String> onlineSegments = ImmutableSet.of(oldSeg, newSeg);
// Set up instances
@@ -1621,8 +1625,9 @@ public class InstanceSelectorTest {
String oldSeg = "segment0";
// Set segment1 as new segment
String newSeg = "segment1";
- List<Pair<String, Long>> segmentPushTime =
ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100));
- createSegments(segmentPushTime);
+ List<Pair<String, Long>> segmentCreationTimeMsPairs =
+ ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100));
+ createSegments(segmentCreationTimeMsPairs);
Set<String> onlineSegments = ImmutableSet.of(oldSeg, newSeg);
// Set up instances
@@ -1775,8 +1780,9 @@ public class InstanceSelectorTest {
String oldSeg = "segment0";
// Set segment1 as new segment
String newSeg = "segment1";
- List<Pair<String, Long>> segmentPushTime =
ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100));
- createSegments(segmentPushTime);
+ List<Pair<String, Long>> segmentCreationTimeMsPairs =
+ ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100));
+ createSegments(segmentCreationTimeMsPairs);
Set<String> onlineSegments = ImmutableSet.of(oldSeg, newSeg);
// Set up instances
@@ -1844,10 +1850,10 @@ public class InstanceSelectorTest {
String oldSeg = "segment0";
// Set segment1 as new segment
String newSeg = "segment1";
- List<Pair<String, Long>> segmentPushTime =
+ List<Pair<String, Long>> segmentCreationTimeMsPairs =
ImmutableList.of(Pair.of(oldSeg, _mutableClock.millis() -
NEW_SEGMENT_EXPIRATION_MILLIS - 100),
Pair.of(newSeg, _mutableClock.millis() - 100));
- createSegments(segmentPushTime);
+ createSegments(segmentCreationTimeMsPairs);
Set<String> onlineSegments = ImmutableSet.of(oldSeg, newSeg);
// Set up instances
@@ -1893,11 +1899,10 @@ public class InstanceSelectorTest {
String newSeg = "segment0";
String oldSeg = "segment1";
- List<Pair<String, Long>> segmentPushTime = ImmutableList.of(
- Pair.of(newSeg, _mutableClock.millis() - 100),
- Pair.of(oldSeg, _mutableClock.millis() - NEW_SEGMENT_EXPIRATION_MILLIS
- 100));
-
- createSegments(segmentPushTime);
+ List<Pair<String, Long>> segmentCreationTimeMsPairs =
+ ImmutableList.of(Pair.of(newSeg, _mutableClock.millis() - 100),
+ Pair.of(oldSeg, _mutableClock.millis() -
NEW_SEGMENT_EXPIRATION_MILLIS - 100));
+ createSegments(segmentCreationTimeMsPairs);
Set<String> onlineSegments = ImmutableSet.of(newSeg, oldSeg);
// Set up instances
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
index 5d70a504fb..083a1a8548 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java
@@ -225,7 +225,7 @@ public class SegmentPartitionMetadataManagerTest extends
ControllerTest {
assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new
String[]{segment1, segment2});
assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
- // Updating the new segment to be replicated on 2 servers should add the
fully replicated server back
+ // Updating the segment to be replicated on 2 servers should add the fully
replicated server back
segmentAssignment.put(segment2, ImmutableMap.of(SERVER_0, ONLINE,
SERVER_1, ONLINE));
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView,
onlineSegments);
tablePartitionInfo = partitionMetadataManager.getTablePartitionInfo();
@@ -236,7 +236,7 @@ public class SegmentPartitionMetadataManagerTest extends
ControllerTest {
assertEqualsNoOrder(partitionInfoMap[1]._segments.toArray(), new
String[]{segment1, segment2});
assertTrue(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty());
- // Adding a new segment without available replica should not update the
partition map
+ // Adding a newly created segment without available replica should not
update the partition map
String newSegment = "newSegment";
onlineSegments.add(newSegment);
setSegmentZKMetadata(newSegment, PARTITION_COLUMN_FUNC, NUM_PARTITIONS, 0,
System.currentTimeMillis());
@@ -265,11 +265,11 @@ public class SegmentPartitionMetadataManagerTest extends
ControllerTest {
}
private void setSegmentZKMetadata(String segment, String partitionFunction,
int numPartitions, int partitionId,
- long pushTimeMs) {
+ long creationTimeMs) {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segment);
segmentZKMetadata.setPartitionMetadata(new
SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN,
new ColumnPartitionMetadata(partitionFunction, numPartitions,
Collections.singleton(partitionId), null))));
- segmentZKMetadata.setPushTime(pushTimeMs);
+ segmentZKMetadata.setCreationTime(creationTimeMs);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore,
OFFLINE_TABLE_NAME, segmentZKMetadata);
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index 085ad33daa..7ee35e7199 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -166,6 +166,10 @@ public class SegmentZKMetadata implements ZKMetadata {
setValue(Segment.TIER, tier);
}
+ /**
+ * For uploaded segment, this is the time when the segment file is created.
For real-time segment, this is the time
+ * when the consuming segment is created.
+ */
public long getCreationTime() {
return _znRecord.getLongField(Segment.CREATION_TIME, -1);
}
@@ -174,6 +178,10 @@ public class SegmentZKMetadata implements ZKMetadata {
setNonNegativeValue(Segment.CREATION_TIME, creationTime);
}
+ /**
+ * Push time exists only for uploaded segments. It is the time when the
segment is first pushed to the cluster (i.e.
+ * when the segment ZK metadata is created).
+ */
public long getPushTime() {
String pushTimeString = _simpleFields.get(Segment.PUSH_TIME);
// Handle legacy push time key
@@ -188,6 +196,10 @@ public class SegmentZKMetadata implements ZKMetadata {
setNonNegativeValue(Segment.PUSH_TIME, pushTime);
}
+ /**
+ * Refresh time exists only for uploaded segments that have been replaced.
It is the time when the segment is last
+ * replaced.
+ */
public long getRefreshTime() {
String refreshTimeString = _simpleFields.get(Segment.REFRESH_TIME);
// Handle legacy refresh time key
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
index b458f5b511..9fd26c641a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
@@ -66,4 +66,20 @@ public class SegmentUtils {
}
return null;
}
+
+ /**
+ * Returns the creation time of a segment based on its ZK metadata. This is
the time when the segment is created in
+ * the cluster, instead of when the segment file is created.
+ * - For uploaded segments, creation time in ZK metadata is the time when
the segment file is created, use push time
+ * instead. Push time is the first time a segment being uploaded. When a
segment is refreshed, push time won't change.
+ * - For real-time segments (not uploaded), push time does not exist, use
creation time.
+ */
+ public static long getSegmentCreationTimeMs(SegmentZKMetadata
segmentZKMetadata) {
+ // Check push time first, then creation time
+ long pushTimeMs = segmentZKMetadata.getPushTime();
+ if (pushTimeMs > 0) {
+ return pushTimeMs;
+ }
+ return segmentZKMetadata.getCreationTime();
+ }
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
new file mode 100644
index 0000000000..203cc249d7
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentUtilsTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class SegmentUtilsTest {
+ private static final String SEGMENT = "testSegment";
+
+ @Test
+ public void testGetSegmentCreationTimeMs() {
+ SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(SEGMENT);
+ segmentZKMetadata.setCreationTime(1000L);
+ assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata),
1000L);
+ segmentZKMetadata.setPushTime(2000L);
+ assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata),
2000L);
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index e8188b3987..481cc7d593 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -30,7 +30,6 @@ import java.util.Random;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.calcite.rel.hint.PinotHintOptions;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.pinot.core.routing.RoutingManager;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.TablePartitionInfo;
@@ -498,8 +497,6 @@ public class WorkerManager {
List<String> candidateList = new ArrayList<>(candidates);
candidateList.sort(null);
int startIndex = (int) ((indexToPick & Long.MAX_VALUE) % numCandidates);
- String[] servers = candidates.toArray(new String[0]);
- ArrayUtils.shuffle(servers, RANDOM);
for (int i = 0; i < numCandidates; i++) {
String server = candidateList.get((startIndex + i) % numCandidates);
ServerInstance serverInstance = enabledServerInstanceMap.get(server);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]