This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2a2a2d1e43 make update segment.tier and rebalance table consistent on
segment's tier (#14516)
2a2a2d1e43 is described below
commit 2a2a2d1e4330f9b0e38d756e7d3bb4f894a8caf1
Author: Xiaobing <[email protected]>
AuthorDate: Thu Nov 21 13:24:16 2024 -0800
make update segment.tier and rebalance table consistent on segment's tier
(#14516)
---
.../org/apache/pinot/common/tier/TierFactory.java | 17 +++++++++++---
.../pinot/common/utils/config/TierConfigUtils.java | 15 ++++++++++---
.../pinot/common/tier/TierConfigUtilsTest.java | 22 +++++++++++++++---
.../helix/core/PinotHelixResourceManager.java | 26 ++++++++++++++--------
.../helix/core/rebalance/TableRebalancer.java | 16 ++++++++-----
.../PinotHelixResourceManagerStatelessTest.java | 25 +++++++++++++++++++--
6 files changed, 96 insertions(+), 25 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/tier/TierFactory.java
b/pinot-common/src/main/java/org/apache/pinot/common/tier/TierFactory.java
index ae8b18147c..92c5d31d4f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/tier/TierFactory.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/tier/TierFactory.java
@@ -20,16 +20,20 @@ package org.apache.pinot.common.tier;
import com.google.common.collect.Sets;
import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.helix.HelixManager;
import org.apache.pinot.spi.config.table.TierConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Factory class to create and sort {@link Tier}
*/
public final class TierFactory {
-
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TierFactory.class);
public static final String TIME_SEGMENT_SELECTOR_TYPE = "time";
public static final String FIXED_SEGMENT_SELECTOR_TYPE = "fixed";
public static final String PINOT_SERVER_STORAGE_TYPE = "pinot_server";
@@ -41,11 +45,18 @@ public final class TierFactory {
* Constructs a {@link Tier} from the {@link TierConfig} in the table config
*/
public static Tier getTier(TierConfig tierConfig, HelixManager helixManager)
{
+ return getTier(tierConfig, helixManager, null);
+ }
+
+ public static Tier getTier(TierConfig tierConfig, HelixManager helixManager,
+ @Nullable Set<String> providedSegmentsForTier) {
TierSegmentSelector segmentSelector;
TierStorage storageSelector;
-
String segmentSelectorType = tierConfig.getSegmentSelectorType();
- if
(segmentSelectorType.equalsIgnoreCase(TierFactory.TIME_SEGMENT_SELECTOR_TYPE)) {
+ if (providedSegmentsForTier != null) {
+ LOGGER.debug("Provided segments: {} for tier: {}",
providedSegmentsForTier, tierConfig.getName());
+ segmentSelector = new FixedTierSegmentSelector(helixManager,
providedSegmentsForTier);
+ } else if
(segmentSelectorType.equalsIgnoreCase(TierFactory.TIME_SEGMENT_SELECTOR_TYPE)) {
segmentSelector = new TimeBasedTierSegmentSelector(helixManager,
tierConfig.getSegmentAge());
} else if
(segmentSelectorType.equalsIgnoreCase(TierFactory.FIXED_SEGMENT_SELECTOR_TYPE))
{
segmentSelector = new FixedTierSegmentSelector(helixManager,
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
index d058934a32..4fd29b688a 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -68,8 +69,8 @@ public final class TierConfigUtils {
* @return InstancePartitions if the one can be derived from the given
sorted tiers, null otherwise
*/
@Nullable
- public static InstancePartitions
getTieredInstancePartitionsForSegment(String tableNameWithType,
- String segmentName, @Nullable List<Tier> sortedTiers, HelixManager
helixManager) {
+ public static InstancePartitions
getTieredInstancePartitionsForSegment(String tableNameWithType, String
segmentName,
+ @Nullable List<Tier> sortedTiers, HelixManager helixManager) {
if (CollectionUtils.isEmpty(sortedTiers)) {
return null;
}
@@ -139,10 +140,18 @@ public final class TierConfigUtils {
*/
public static List<Tier> getSortedTiersForStorageType(List<TierConfig>
tierConfigList, String storageType,
HelixManager helixManager) {
+ return getSortedTiersForStorageType(tierConfigList, storageType,
helixManager, null);
+ }
+
+ public static List<Tier> getSortedTiersForStorageType(List<TierConfig>
tierConfigList, String storageType,
+ HelixManager helixManager, @Nullable Map<String, Set<String>>
providedTierToSegmentsMap) {
List<Tier> sortedTiers = new ArrayList<>();
for (TierConfig tierConfig : tierConfigList) {
if (storageType.equalsIgnoreCase(tierConfig.getStorageType())) {
- sortedTiers.add(TierFactory.getTier(tierConfig, helixManager));
+ String tierName = tierConfig.getName();
+ Set<String> providedSegmentsForTier =
+ providedTierToSegmentsMap == null ? null :
providedTierToSegmentsMap.get(tierName);
+ sortedTiers.add(TierFactory.getTier(tierConfig, helixManager,
providedSegmentsForTier));
}
}
sortedTiers.sort(TierConfigUtils.getTierComparator());
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
index 75200fcb4e..5161bdbb9b 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -107,6 +108,14 @@ public class TierConfigUtilsTest {
Assert.assertEquals(tier.getStorage().getType(),
TierFactory.PINOT_SERVER_STORAGE_TYPE);
Assert.assertEquals(((PinotServerTierStorage)
tier.getStorage()).getServerTag(), "tier1_tag_OFFLINE");
+ // With provided segments, the time base selector is overwritten by a
fixed selector.
+ tier = TierFactory.getTier(tierConfig, null, Set.of("segment1",
"segment2"));
+ Assert.assertEquals(tier.getName(), "tier1");
+ Assert.assertTrue(tier.getSegmentSelector() instanceof
FixedTierSegmentSelector);
+ Assert.assertEquals(tier.getSegmentSelector().getType(),
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE);
+ Assert.assertEquals(((FixedTierSegmentSelector)
tier.getSegmentSelector()).getSegmentsToSelect(),
+ Sets.newHashSet("segment1", "segment2"));
+
tierConfig = new TierConfig("tier1",
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null,
Lists.newArrayList("segment1", "segment2", "segment3"),
TierFactory.PINOT_SERVER_STORAGE_TYPE,
"tier1_tag_OFFLINE", null, null);
@@ -117,9 +126,16 @@ public class TierConfigUtilsTest {
Assert.assertEquals(((FixedTierSegmentSelector)
tier.getSegmentSelector()).getSegmentsToSelect(),
Sets.newHashSet("segment1", "segment2", "segment3"));
- tierConfig = new TierConfig("tier1",
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null,
- null, TierFactory.PINOT_SERVER_STORAGE_TYPE,
- "tier1_tag_OFFLINE", null, null);
+ // With provided segments, the fixed selector can be overwritten with
different set of segments.
+ tier = TierFactory.getTier(tierConfig, null, Set.of("segment1a",
"segment2b"));
+ Assert.assertEquals(tier.getName(), "tier1");
+ Assert.assertTrue(tier.getSegmentSelector() instanceof
FixedTierSegmentSelector);
+ Assert.assertEquals(tier.getSegmentSelector().getType(),
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE);
+ Assert.assertEquals(((FixedTierSegmentSelector)
tier.getSegmentSelector()).getSegmentsToSelect(),
+ Sets.newHashSet("segment1a", "segment2b"));
+
+ tierConfig = new TierConfig("tier1",
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null, null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null,
null);
tier = TierFactory.getTier(tierConfig, null);
Assert.assertEquals(tier.getName(), "tier1");
Assert.assertTrue(tier.getSegmentSelector() instanceof
FixedTierSegmentSelector);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index e7affa4287..1f1e95877f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -3520,12 +3520,13 @@ public class PinotHelixResourceManager {
public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig
tableConfig, String rebalanceJobId,
RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver
zkBasedTableRebalanceObserver) {
+ Map<String, Set<String>> tierToSegmentsMap = null;
if (rebalanceConfig.isUpdateTargetTier()) {
- updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
+ tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType,
tableConfig);
}
TableRebalancer tableRebalancer =
new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver,
_controllerMetrics);
- return tableRebalancer.rebalance(tableConfig, rebalanceConfig,
rebalanceJobId);
+ return tableRebalancer.rebalance(tableConfig, rebalanceConfig,
rebalanceJobId, tierToSegmentsMap);
}
/**
@@ -3533,22 +3534,28 @@ public class PinotHelixResourceManager {
* checked by servers when loading the segment to put it onto the target
storage tier.
*/
@VisibleForTesting
- void updateTargetTier(String rebalanceJobId, String tableNameWithType,
TableConfig tableConfig) {
+ Map<String, Set<String>> updateTargetTier(String rebalanceJobId, String
tableNameWithType, TableConfig tableConfig) {
List<TierConfig> tierCfgs = tableConfig.getTierConfigsList();
List<Tier> sortedTiers =
- tierCfgs == null ? Collections.emptyList() :
TierConfigUtils.getSortedTiers(tierCfgs, _helixZkManager);
+ CollectionUtils.isNotEmpty(tierCfgs) ?
TierConfigUtils.getSortedTiersForStorageType(tierCfgs,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager) :
Collections.emptyList();
LOGGER.info("For rebalanceId: {}, updating target tiers for segments of
table: {} with tierConfigs: {}",
rebalanceJobId, tableNameWithType, sortedTiers);
+ Map<String, Set<String>> tierToSegmentsMap = new HashMap<>();
for (String segmentName : getSegmentsFor(tableNameWithType, true)) {
- updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+ String tier = updateSegmentTargetTier(tableNameWithType, segmentName,
sortedTiers);
+ if (tier != null) {
+ tierToSegmentsMap.computeIfAbsent(tier, t -> new
HashSet<>()).add(segmentName);
+ }
}
+ return tierToSegmentsMap;
}
- private void updateSegmentTargetTier(String tableNameWithType, String
segmentName, List<Tier> sortedTiers) {
+ private String updateSegmentTargetTier(String tableNameWithType, String
segmentName, List<Tier> sortedTiers) {
ZNRecord segmentMetadataZNRecord =
getSegmentMetadataZnRecord(tableNameWithType, segmentName);
if (segmentMetadataZNRecord == null) {
LOGGER.debug("No ZK metadata for segment: {} of table: {}", segmentName,
tableNameWithType);
- return;
+ return null;
}
Tier targetTier = null;
for (Tier tier : sortedTiers) {
@@ -3563,7 +3570,7 @@ public class PinotHelixResourceManager {
if (targetTier == null) {
if (segmentZKMetadata.getTier() == null) {
LOGGER.debug("Segment: {} of table: {} is already set to go to default
tier", segmentName, tableNameWithType);
- return;
+ return null;
}
LOGGER.info("Segment: {} of table: {} is put back on default tier",
segmentName, tableNameWithType);
} else {
@@ -3571,13 +3578,14 @@ public class PinotHelixResourceManager {
if (targetTierName.equals(segmentZKMetadata.getTier())) {
LOGGER.debug("Segment: {} of table: {} is already set to go to target
tier: {}", segmentName, tableNameWithType,
targetTierName);
- return;
+ return targetTierName;
}
LOGGER.info("Segment: {} of table: {} is put onto new tier: {}",
segmentName, tableNameWithType, targetTierName);
}
// Update the tier in segment ZK metadata and write it back to ZK.
segmentZKMetadata.setTier(targetTierName);
updateZkMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadataZNRecord.getVersion());
+ return targetTierName;
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 395edf0827..facd904f21 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -142,11 +142,16 @@ public class TableRebalancer {
public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig
rebalanceConfig,
@Nullable String rebalanceJobId) {
+ return rebalance(tableConfig, rebalanceConfig, rebalanceJobId, null);
+ }
+
+ public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig
rebalanceConfig,
+ @Nullable String rebalanceJobId, @Nullable Map<String, Set<String>>
providedTierToSegmentsMap) {
long startTime = System.currentTimeMillis();
String tableNameWithType = tableConfig.getTableName();
RebalanceResult.Status status = RebalanceResult.Status.UNKNOWN_ERROR;
try {
- RebalanceResult result = doRebalance(tableConfig, rebalanceConfig,
rebalanceJobId);
+ RebalanceResult result = doRebalance(tableConfig, rebalanceConfig,
rebalanceJobId, providedTierToSegmentsMap);
status = result.getStatus();
return result;
} finally {
@@ -159,7 +164,7 @@ public class TableRebalancer {
}
private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig
rebalanceConfig,
- @Nullable String rebalanceJobId) {
+ @Nullable String rebalanceJobId, @Nullable Map<String, Set<String>>
providedTierToSegmentsMap) {
long startTimeMs = System.currentTimeMillis();
String tableNameWithType = tableConfig.getTableName();
if (rebalanceJobId == null) {
@@ -238,7 +243,7 @@ public class TableRebalancer {
Map<String, InstancePartitions> tierToInstancePartitionsMap;
boolean tierInstancePartitionsUnchanged;
try {
- sortedTiers = getSortedTiers(tableConfig);
+ sortedTiers = getSortedTiers(tableConfig, providedTierToSegmentsMap);
Pair<Map<String, InstancePartitions>, Boolean>
tierToInstancePartitionsMapAndUnchanged =
getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, dryRun);
tierToInstancePartitionsMap =
tierToInstancePartitionsMapAndUnchanged.getLeft();
@@ -671,13 +676,14 @@ public class TableRebalancer {
}
@Nullable
- private List<Tier> getSortedTiers(TableConfig tableConfig) {
+ private List<Tier> getSortedTiers(TableConfig tableConfig,
+ @Nullable Map<String, Set<String>> providedTierToSegmentsMap) {
List<TierConfig> tierConfigs = tableConfig.getTierConfigsList();
if (CollectionUtils.isNotEmpty(tierConfigs)) {
// Get tiers with storageType = "PINOT_SERVER". This is the only type
available right now.
// Other types should be treated differently
return TierConfigUtils.getSortedTiersForStorageType(tierConfigs,
TierFactory.PINOT_SERVER_STORAGE_TYPE,
- _helixManager);
+ _helixManager, providedTierToSegmentsMap);
} else {
return null;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 902389bef4..095865d923 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -920,21 +920,42 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
assertNull(segmentZKMetadata.getTier());
// Move on to new tier
- _helixResourceManager.updateTargetTier("j1", tableConfig.getTableName(),
tableConfig);
+ Map<String, Set<String>> tierToSegmentsMap =
+ _helixResourceManager.updateTargetTier("j1",
tableConfig.getTableName(), tableConfig);
List<SegmentZKMetadata> retrievedSegmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
SegmentZKMetadata retrievedSegmentZKMetadata =
retrievedSegmentsZKMetadata.get(0);
assertEquals(retrievedSegmentZKMetadata.getTier(), "tier1");
+ assertEquals(tierToSegmentsMap.size(), 1);
+ assertEquals(tierToSegmentsMap.get("tier1"), Set.of("testSegment"));
+
+ // No tier move
+ tierToSegmentsMap =
+ _helixResourceManager.updateTargetTier("j11",
tableConfig.getTableName(), tableConfig);
+ retrievedSegmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
+ retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
+ assertEquals(retrievedSegmentZKMetadata.getTier(), "tier1");
+ assertEquals(tierToSegmentsMap.size(), 1);
+ assertEquals(tierToSegmentsMap.get("tier1"), Set.of("testSegment"));
// Move back to default tier
tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
.setServerTenant(SERVER_TENANT_NAME).build();
_helixResourceManager.updateTableConfig(tableConfig);
- _helixResourceManager.updateTargetTier("j2", tableConfig.getTableName(),
tableConfig);
+ tierToSegmentsMap = _helixResourceManager.updateTargetTier("j2",
tableConfig.getTableName(), tableConfig);
+ retrievedSegmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
+ retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
+ assertNull(retrievedSegmentZKMetadata.getTier());
+ assertTrue(tierToSegmentsMap.isEmpty());
+
+ // No tier move
+ tierToSegmentsMap =
+ _helixResourceManager.updateTargetTier("j22",
tableConfig.getTableName(), tableConfig);
retrievedSegmentsZKMetadata =
_helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
assertNull(retrievedSegmentZKMetadata.getTier());
+ assertTrue(tierToSegmentsMap.isEmpty());
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]