This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a2a2d1e43 make update segment.tier and rebalance table consistent on 
segment's tier (#14516)
2a2a2d1e43 is described below

commit 2a2a2d1e4330f9b0e38d756e7d3bb4f894a8caf1
Author: Xiaobing <[email protected]>
AuthorDate: Thu Nov 21 13:24:16 2024 -0800

    make update segment.tier and rebalance table consistent on segment's tier 
(#14516)
---
 .../org/apache/pinot/common/tier/TierFactory.java  | 17 +++++++++++---
 .../pinot/common/utils/config/TierConfigUtils.java | 15 ++++++++++---
 .../pinot/common/tier/TierConfigUtilsTest.java     | 22 +++++++++++++++---
 .../helix/core/PinotHelixResourceManager.java      | 26 ++++++++++++++--------
 .../helix/core/rebalance/TableRebalancer.java      | 16 ++++++++-----
 .../PinotHelixResourceManagerStatelessTest.java    | 25 +++++++++++++++++++--
 6 files changed, 96 insertions(+), 25 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/tier/TierFactory.java 
b/pinot-common/src/main/java/org/apache/pinot/common/tier/TierFactory.java
index ae8b18147c..92c5d31d4f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/tier/TierFactory.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/tier/TierFactory.java
@@ -20,16 +20,20 @@ package org.apache.pinot.common.tier;
 
 import com.google.common.collect.Sets;
 import java.util.Collections;
+import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.spi.config.table.TierConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Factory class to create and sort {@link Tier}
  */
 public final class TierFactory {
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TierFactory.class);
   public static final String TIME_SEGMENT_SELECTOR_TYPE = "time";
   public static final String FIXED_SEGMENT_SELECTOR_TYPE = "fixed";
   public static final String PINOT_SERVER_STORAGE_TYPE = "pinot_server";
@@ -41,11 +45,18 @@ public final class TierFactory {
    * Constructs a {@link Tier} from the {@link TierConfig} in the table config
    */
   public static Tier getTier(TierConfig tierConfig, HelixManager helixManager) 
{
+    return getTier(tierConfig, helixManager, null);
+  }
+
+  public static Tier getTier(TierConfig tierConfig, HelixManager helixManager,
+      @Nullable Set<String> providedSegmentsForTier) {
     TierSegmentSelector segmentSelector;
     TierStorage storageSelector;
-
     String segmentSelectorType = tierConfig.getSegmentSelectorType();
-    if 
(segmentSelectorType.equalsIgnoreCase(TierFactory.TIME_SEGMENT_SELECTOR_TYPE)) {
+    if (providedSegmentsForTier != null) {
+      LOGGER.debug("Provided segments: {} for tier: {}", 
providedSegmentsForTier, tierConfig.getName());
+      segmentSelector = new FixedTierSegmentSelector(helixManager, 
providedSegmentsForTier);
+    } else if 
(segmentSelectorType.equalsIgnoreCase(TierFactory.TIME_SEGMENT_SELECTOR_TYPE)) {
       segmentSelector = new TimeBasedTierSegmentSelector(helixManager, 
tierConfig.getSegmentAge());
     } else if 
(segmentSelectorType.equalsIgnoreCase(TierFactory.FIXED_SEGMENT_SELECTOR_TYPE)) 
{
       segmentSelector = new FixedTierSegmentSelector(helixManager,
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
index d058934a32..4fd29b688a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TierConfigUtils.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -68,8 +69,8 @@ public final class TierConfigUtils {
    * @return InstancePartitions if the one can be derived from the given 
sorted tiers, null otherwise
    */
   @Nullable
-  public static InstancePartitions 
getTieredInstancePartitionsForSegment(String tableNameWithType,
-      String segmentName, @Nullable List<Tier> sortedTiers, HelixManager 
helixManager) {
+  public static InstancePartitions 
getTieredInstancePartitionsForSegment(String tableNameWithType, String 
segmentName,
+      @Nullable List<Tier> sortedTiers, HelixManager helixManager) {
     if (CollectionUtils.isEmpty(sortedTiers)) {
       return null;
     }
@@ -139,10 +140,18 @@ public final class TierConfigUtils {
    */
   public static List<Tier> getSortedTiersForStorageType(List<TierConfig> 
tierConfigList, String storageType,
       HelixManager helixManager) {
+    return getSortedTiersForStorageType(tierConfigList, storageType, 
helixManager, null);
+  }
+
+  public static List<Tier> getSortedTiersForStorageType(List<TierConfig> 
tierConfigList, String storageType,
+      HelixManager helixManager, @Nullable Map<String, Set<String>> 
providedTierToSegmentsMap) {
     List<Tier> sortedTiers = new ArrayList<>();
     for (TierConfig tierConfig : tierConfigList) {
       if (storageType.equalsIgnoreCase(tierConfig.getStorageType())) {
-        sortedTiers.add(TierFactory.getTier(tierConfig, helixManager));
+        String tierName = tierConfig.getName();
+        Set<String> providedSegmentsForTier =
+            providedTierToSegmentsMap == null ? null : 
providedTierToSegmentsMap.get(tierName);
+        sortedTiers.add(TierFactory.getTier(tierConfig, helixManager, 
providedSegmentsForTier));
       }
     }
     sortedTiers.sort(TierConfigUtils.getTierComparator());
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
index 75200fcb4e..5161bdbb9b 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/tier/TierConfigUtilsTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import org.apache.pinot.common.utils.config.TierConfigUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -107,6 +108,14 @@ public class TierConfigUtilsTest {
     Assert.assertEquals(tier.getStorage().getType(), 
TierFactory.PINOT_SERVER_STORAGE_TYPE);
     Assert.assertEquals(((PinotServerTierStorage) 
tier.getStorage()).getServerTag(), "tier1_tag_OFFLINE");
 
+    // With provided segments, the time base selector is overwritten by a 
fixed selector.
+    tier = TierFactory.getTier(tierConfig, null, Set.of("segment1", 
"segment2"));
+    Assert.assertEquals(tier.getName(), "tier1");
+    Assert.assertTrue(tier.getSegmentSelector() instanceof 
FixedTierSegmentSelector);
+    Assert.assertEquals(tier.getSegmentSelector().getType(), 
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE);
+    Assert.assertEquals(((FixedTierSegmentSelector) 
tier.getSegmentSelector()).getSegmentsToSelect(),
+        Sets.newHashSet("segment1", "segment2"));
+
     tierConfig = new TierConfig("tier1", 
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null,
         Lists.newArrayList("segment1", "segment2", "segment3"), 
TierFactory.PINOT_SERVER_STORAGE_TYPE,
         "tier1_tag_OFFLINE", null, null);
@@ -117,9 +126,16 @@ public class TierConfigUtilsTest {
     Assert.assertEquals(((FixedTierSegmentSelector) 
tier.getSegmentSelector()).getSegmentsToSelect(),
         Sets.newHashSet("segment1", "segment2", "segment3"));
 
-    tierConfig = new TierConfig("tier1", 
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null,
-        null, TierFactory.PINOT_SERVER_STORAGE_TYPE,
-        "tier1_tag_OFFLINE", null, null);
+    // With provided segments, the fixed selector can be overwritten with 
different set of segments.
+    tier = TierFactory.getTier(tierConfig, null, Set.of("segment1a", 
"segment2b"));
+    Assert.assertEquals(tier.getName(), "tier1");
+    Assert.assertTrue(tier.getSegmentSelector() instanceof 
FixedTierSegmentSelector);
+    Assert.assertEquals(tier.getSegmentSelector().getType(), 
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE);
+    Assert.assertEquals(((FixedTierSegmentSelector) 
tier.getSegmentSelector()).getSegmentsToSelect(),
+        Sets.newHashSet("segment1a", "segment2b"));
+
+    tierConfig = new TierConfig("tier1", 
TierFactory.FIXED_SEGMENT_SELECTOR_TYPE, null, null,
+        TierFactory.PINOT_SERVER_STORAGE_TYPE, "tier1_tag_OFFLINE", null, 
null);
     tier = TierFactory.getTier(tierConfig, null);
     Assert.assertEquals(tier.getName(), "tier1");
     Assert.assertTrue(tier.getSegmentSelector() instanceof 
FixedTierSegmentSelector);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index e7affa4287..1f1e95877f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -3520,12 +3520,13 @@ public class PinotHelixResourceManager {
 
   public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig 
tableConfig, String rebalanceJobId,
       RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver 
zkBasedTableRebalanceObserver) {
+    Map<String, Set<String>> tierToSegmentsMap = null;
     if (rebalanceConfig.isUpdateTargetTier()) {
-      updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig);
+      tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType, 
tableConfig);
     }
     TableRebalancer tableRebalancer =
         new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, 
_controllerMetrics);
-    return tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
rebalanceJobId);
+    return tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
rebalanceJobId, tierToSegmentsMap);
   }
 
   /**
@@ -3533,22 +3534,28 @@ public class PinotHelixResourceManager {
    * checked by servers when loading the segment to put it onto the target 
storage tier.
    */
   @VisibleForTesting
-  void updateTargetTier(String rebalanceJobId, String tableNameWithType, 
TableConfig tableConfig) {
+  Map<String, Set<String>> updateTargetTier(String rebalanceJobId, String 
tableNameWithType, TableConfig tableConfig) {
     List<TierConfig> tierCfgs = tableConfig.getTierConfigsList();
     List<Tier> sortedTiers =
-        tierCfgs == null ? Collections.emptyList() : 
TierConfigUtils.getSortedTiers(tierCfgs, _helixZkManager);
+        CollectionUtils.isNotEmpty(tierCfgs) ? 
TierConfigUtils.getSortedTiersForStorageType(tierCfgs,
+            TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager) : 
Collections.emptyList();
     LOGGER.info("For rebalanceId: {}, updating target tiers for segments of 
table: {} with tierConfigs: {}",
         rebalanceJobId, tableNameWithType, sortedTiers);
+    Map<String, Set<String>> tierToSegmentsMap = new HashMap<>();
     for (String segmentName : getSegmentsFor(tableNameWithType, true)) {
-      updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers);
+      String tier = updateSegmentTargetTier(tableNameWithType, segmentName, 
sortedTiers);
+      if (tier != null) {
+        tierToSegmentsMap.computeIfAbsent(tier, t -> new 
HashSet<>()).add(segmentName);
+      }
     }
+    return tierToSegmentsMap;
   }
 
-  private void updateSegmentTargetTier(String tableNameWithType, String 
segmentName, List<Tier> sortedTiers) {
+  private String updateSegmentTargetTier(String tableNameWithType, String 
segmentName, List<Tier> sortedTiers) {
     ZNRecord segmentMetadataZNRecord = 
getSegmentMetadataZnRecord(tableNameWithType, segmentName);
     if (segmentMetadataZNRecord == null) {
       LOGGER.debug("No ZK metadata for segment: {} of table: {}", segmentName, 
tableNameWithType);
-      return;
+      return null;
     }
     Tier targetTier = null;
     for (Tier tier : sortedTiers) {
@@ -3563,7 +3570,7 @@ public class PinotHelixResourceManager {
     if (targetTier == null) {
       if (segmentZKMetadata.getTier() == null) {
         LOGGER.debug("Segment: {} of table: {} is already set to go to default 
tier", segmentName, tableNameWithType);
-        return;
+        return null;
       }
       LOGGER.info("Segment: {} of table: {} is put back on default tier", 
segmentName, tableNameWithType);
     } else {
@@ -3571,13 +3578,14 @@ public class PinotHelixResourceManager {
       if (targetTierName.equals(segmentZKMetadata.getTier())) {
         LOGGER.debug("Segment: {} of table: {} is already set to go to target 
tier: {}", segmentName, tableNameWithType,
             targetTierName);
-        return;
+        return targetTierName;
       }
       LOGGER.info("Segment: {} of table: {} is put onto new tier: {}", 
segmentName, tableNameWithType, targetTierName);
     }
     // Update the tier in segment ZK metadata and write it back to ZK.
     segmentZKMetadata.setTier(targetTierName);
     updateZkMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadataZNRecord.getVersion());
+    return targetTierName;
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 395edf0827..facd904f21 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -142,11 +142,16 @@ public class TableRebalancer {
 
   public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig 
rebalanceConfig,
       @Nullable String rebalanceJobId) {
+    return rebalance(tableConfig, rebalanceConfig, rebalanceJobId, null);
+  }
+
+  public RebalanceResult rebalance(TableConfig tableConfig, RebalanceConfig 
rebalanceConfig,
+      @Nullable String rebalanceJobId, @Nullable Map<String, Set<String>> 
providedTierToSegmentsMap) {
     long startTime = System.currentTimeMillis();
     String tableNameWithType = tableConfig.getTableName();
     RebalanceResult.Status status = RebalanceResult.Status.UNKNOWN_ERROR;
     try {
-      RebalanceResult result = doRebalance(tableConfig, rebalanceConfig, 
rebalanceJobId);
+      RebalanceResult result = doRebalance(tableConfig, rebalanceConfig, 
rebalanceJobId, providedTierToSegmentsMap);
       status = result.getStatus();
       return result;
     } finally {
@@ -159,7 +164,7 @@ public class TableRebalancer {
   }
 
   private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig 
rebalanceConfig,
-      @Nullable String rebalanceJobId) {
+      @Nullable String rebalanceJobId, @Nullable Map<String, Set<String>> 
providedTierToSegmentsMap) {
     long startTimeMs = System.currentTimeMillis();
     String tableNameWithType = tableConfig.getTableName();
     if (rebalanceJobId == null) {
@@ -238,7 +243,7 @@ public class TableRebalancer {
     Map<String, InstancePartitions> tierToInstancePartitionsMap;
     boolean tierInstancePartitionsUnchanged;
     try {
-      sortedTiers = getSortedTiers(tableConfig);
+      sortedTiers = getSortedTiers(tableConfig, providedTierToSegmentsMap);
       Pair<Map<String, InstancePartitions>, Boolean> 
tierToInstancePartitionsMapAndUnchanged =
           getTierToInstancePartitionsMap(tableConfig, sortedTiers, 
reassignInstances, bootstrap, dryRun);
       tierToInstancePartitionsMap = 
tierToInstancePartitionsMapAndUnchanged.getLeft();
@@ -671,13 +676,14 @@ public class TableRebalancer {
   }
 
   @Nullable
-  private List<Tier> getSortedTiers(TableConfig tableConfig) {
+  private List<Tier> getSortedTiers(TableConfig tableConfig,
+      @Nullable Map<String, Set<String>> providedTierToSegmentsMap) {
     List<TierConfig> tierConfigs = tableConfig.getTierConfigsList();
     if (CollectionUtils.isNotEmpty(tierConfigs)) {
       // Get tiers with storageType = "PINOT_SERVER". This is the only type 
available right now.
       // Other types should be treated differently
       return TierConfigUtils.getSortedTiersForStorageType(tierConfigs, 
TierFactory.PINOT_SERVER_STORAGE_TYPE,
-          _helixManager);
+          _helixManager, providedTierToSegmentsMap);
     } else {
       return null;
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 902389bef4..095865d923 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -920,21 +920,42 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     assertNull(segmentZKMetadata.getTier());
 
     // Move on to new tier
-    _helixResourceManager.updateTargetTier("j1", tableConfig.getTableName(), 
tableConfig);
+    Map<String, Set<String>> tierToSegmentsMap =
+        _helixResourceManager.updateTargetTier("j1", 
tableConfig.getTableName(), tableConfig);
     List<SegmentZKMetadata> retrievedSegmentsZKMetadata =
         _helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
     SegmentZKMetadata retrievedSegmentZKMetadata = 
retrievedSegmentsZKMetadata.get(0);
     assertEquals(retrievedSegmentZKMetadata.getTier(), "tier1");
+    assertEquals(tierToSegmentsMap.size(), 1);
+    assertEquals(tierToSegmentsMap.get("tier1"), Set.of("testSegment"));
+
+    // No tier move
+    tierToSegmentsMap =
+        _helixResourceManager.updateTargetTier("j11", 
tableConfig.getTableName(), tableConfig);
+    retrievedSegmentsZKMetadata = 
_helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
+    retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
+    assertEquals(retrievedSegmentZKMetadata.getTier(), "tier1");
+    assertEquals(tierToSegmentsMap.size(), 1);
+    assertEquals(tierToSegmentsMap.get("tier1"), Set.of("testSegment"));
 
     // Move back to default tier
     tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
             .setServerTenant(SERVER_TENANT_NAME).build();
     _helixResourceManager.updateTableConfig(tableConfig);
-    _helixResourceManager.updateTargetTier("j2", tableConfig.getTableName(), 
tableConfig);
+    tierToSegmentsMap = _helixResourceManager.updateTargetTier("j2", 
tableConfig.getTableName(), tableConfig);
+    retrievedSegmentsZKMetadata = 
_helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
+    retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
+    assertNull(retrievedSegmentZKMetadata.getTier());
+    assertTrue(tierToSegmentsMap.isEmpty());
+
+    // No tier move
+    tierToSegmentsMap =
+        _helixResourceManager.updateTargetTier("j22", 
tableConfig.getTableName(), tableConfig);
     retrievedSegmentsZKMetadata = 
_helixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME);
     retrievedSegmentZKMetadata = retrievedSegmentsZKMetadata.get(0);
     assertNull(retrievedSegmentZKMetadata.getTier());
+    assertTrue(tierToSegmentsMap.isEmpty());
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to