This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch Computing-resource-balancing_cp
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b2dd4d1b124aed464e365d8b082ddaeda3307c69
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Jul 31 16:28:26 2023 +0800

    resolve conversations
---
 .../partition/IoTDBPartitionInheritPolicyIT.java   |  5 ++
 .../iotdb/confignode/manager/load/LoadManager.java |  4 +-
 .../manager/load/balancer/PartitionBalancer.java   | 27 ++++---
 .../persistence/partition/PartitionInfo.java       |  3 +-
 .../impl/schema/DeleteDatabaseProcedure.java       |  4 +-
 .../commons/partition/DataPartitionEntry.java      | 87 ----------------------
 .../iotdb/commons/structure/BalanceTreeMap.java    | 38 ++++++----
 .../commons/partition/DataPartitionEntryTest.java  | 58 ---------------
 8 files changed, 49 insertions(+), 177 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
index bdb233950e2..2efd5db7a0b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
@@ -45,6 +45,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
@@ -170,6 +171,7 @@ public class IoTDBPartitionInheritPolicyIT {
     counter.forEach((groupId, num) -> 
Assert.assertEquals(expectedPartitionNum2, num.intValue()));
 
     // Test DataPartition inherit policy
+    AtomicInteger inheritedSeriesSlotNum = new AtomicInteger(0);
     Map<TSeriesPartitionSlot, TConsensusGroupId> dataAllotTable2 = new 
ConcurrentHashMap<>();
     dataPartitionTableResp
         .getDataPartitionTable()
@@ -193,9 +195,12 @@ public class IoTDBPartitionInheritPolicyIT {
                 // The DataRegionGroup has been inherited
                 
Assert.assertTrue(dataAllotTable1.containsKey(seriesPartitionSlot));
                 Assert.assertEquals(dataAllotTable1.get(seriesPartitionSlot), 
groupId);
+                inheritedSeriesSlotNum.incrementAndGet();
               }
               dataAllotTable2.put(seriesPartitionSlot, groupId);
             }));
+    // Exactly half of the SeriesSlots are inherited
+    Assert.assertEquals(testSeriesSlotNum / 2, inheritedSeriesSlotNum.get());
 
     // Test3: historical DataPartitions will inherit successor
     Random random = new Random();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index e3472e2e0f2..b4c291c97e9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -152,8 +152,8 @@ public class LoadManager {
     partitionBalancer.clearPartitionBalancer();
   }
 
-  public void clearPartitionBalancer() {
-    partitionBalancer.clearPartitionBalancer();
+  public void clearDataPartitionPolicyTable(String database) {
+    partitionBalancer.clearDataPartitionPolicyTable(database);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
index 42ac541655c..78be7046de5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java
@@ -40,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -55,11 +56,11 @@ public class PartitionBalancer {
   private final IManager configManager;
 
   // Map<DatabaseName, DataPartitionPolicyTable>
-  private final Map<String, DataPartitionPolicyTable> dataAllotTableMap;
+  private final Map<String, DataPartitionPolicyTable> 
dataPartitionPolicyTableMap;
 
   public PartitionBalancer(IManager configManager) {
     this.configManager = configManager;
-    this.dataAllotTableMap = new ConcurrentHashMap<>();
+    this.dataPartitionPolicyTableMap = new ConcurrentHashMap<>();
   }
 
   /**
@@ -71,7 +72,7 @@ public class PartitionBalancer {
   public Map<String, SchemaPartitionTable> allocateSchemaPartition(
       Map<String, List<TSeriesPartitionSlot>> 
unassignedSchemaPartitionSlotsMap)
       throws NoAvailableRegionGroupException {
-    Map<String, SchemaPartitionTable> result = new ConcurrentHashMap<>();
+    Map<String, SchemaPartitionTable> result = new HashMap<>();
 
     for (Map.Entry<String, List<TSeriesPartitionSlot>> slotsMapEntry :
         unassignedSchemaPartitionSlotsMap.entrySet()) {
@@ -89,7 +90,7 @@ public class PartitionBalancer {
       }
 
       // Enumerate SeriesPartitionSlot
-      Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new 
ConcurrentHashMap<>();
+      Map<TSeriesPartitionSlot, TConsensusGroupId> schemaPartitionMap = new 
HashMap<>();
       for (TSeriesPartitionSlot seriesPartitionSlot : 
unassignedPartitionSlots) {
         // Greedy allocation: allocate the unassigned SchemaPartition to
         // the RegionGroup whose allocated SchemaPartitions is the least
@@ -112,7 +113,7 @@ public class PartitionBalancer {
   public Map<String, DataPartitionTable> allocateDataPartition(
       Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
unassignedDataPartitionSlotsMap)
       throws NoAvailableRegionGroupException {
-    Map<String, DataPartitionTable> result = new ConcurrentHashMap<>();
+    Map<String, DataPartitionTable> result = new HashMap<>();
 
     for (Map.Entry<String, Map<TSeriesPartitionSlot, TTimeSlotList>> 
slotsMapEntry :
         unassignedDataPartitionSlotsMap.entrySet()) {
@@ -130,7 +131,7 @@ public class PartitionBalancer {
         counter.put(pair.getRight(), pair.getLeft().intValue());
       }
 
-      DataPartitionPolicyTable allotTable = dataAllotTableMap.get(database);
+      DataPartitionPolicyTable allotTable = 
dataPartitionPolicyTableMap.get(database);
       allotTable.acquireLock();
       DataPartitionTable dataPartitionTable = new DataPartitionTable();
 
@@ -205,7 +206,7 @@ public class PartitionBalancer {
    */
   public void reBalanceDataPartitionPolicy(String database) {
     try {
-      dataAllotTableMap
+      dataPartitionPolicyTableMap
           .computeIfAbsent(database, empty -> new DataPartitionPolicyTable())
           .reBalanceDataPartitionPolicy(
               getPartitionManager().getAllRegionGroupIds(database, 
TConsensusGroupType.DataRegion));
@@ -216,13 +217,13 @@ public class PartitionBalancer {
 
   /** Set up the PartitionBalancer when the current ConfigNode becomes leader. 
*/
   public void setupPartitionBalancer() {
-    dataAllotTableMap.clear();
+    dataPartitionPolicyTableMap.clear();
     getClusterSchemaManager()
         .getDatabaseNames()
         .forEach(
             database -> {
-              dataAllotTableMap.put(database, new DataPartitionPolicyTable());
-              DataPartitionPolicyTable dataPartitionPolicyTable = 
dataAllotTableMap.get(database);
+              DataPartitionPolicyTable dataPartitionPolicyTable = new 
DataPartitionPolicyTable();
+              dataPartitionPolicyTableMap.put(database, 
dataPartitionPolicyTable);
               try {
                 // Put all DataRegionGroups into the DataPartitionPolicyTable
                 dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
@@ -239,7 +240,11 @@ public class PartitionBalancer {
 
   /** Clear the PartitionBalancer when the current ConfigNode is no longer the 
leader. */
   public void clearPartitionBalancer() {
-    dataAllotTableMap.clear();
+    dataPartitionPolicyTableMap.clear();
+  }
+
+  public void clearDataPartitionPolicyTable(String database) {
+    dataPartitionPolicyTableMap.remove(database);
   }
 
   private ClusterSchemaManager getClusterSchemaManager() {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index 66ab605e066..d61396a4c7a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -79,7 +79,6 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -832,7 +831,7 @@ public class PartitionInfo implements SnapshotProcessor {
     if (isDatabaseExisted(database)) {
       return databasePartitionTables.get(database).getLastDataAllotTable();
     }
-    return new HashMap<>();
+    return Collections.emptyMap();
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 864f7a3c186..7ea774cc33c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -206,7 +206,9 @@ public class DeleteDatabaseProcedure
             LOG.info(
                 "[DeleteDatabaseProcedure] Database: {} is deleted 
successfully",
                 deleteDatabaseSchema.getName());
-            env.getConfigManager().getLoadManager().clearPartitionBalancer();
+            env.getConfigManager()
+                .getLoadManager()
+                .clearDataPartitionPolicyTable(deleteDatabaseSchema.getName());
             return Flow.NO_MORE_STATE;
           } else if (getCycles() > RETRY_THRESHOLD) {
             setFailure(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java
deleted file mode 100644
index e94f6eed085..00000000000
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionEntry.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-
-import java.security.SecureRandom;
-import java.util.Objects;
-
-public class DataPartitionEntry implements Comparable<DataPartitionEntry> {
-
-  private final TSeriesPartitionSlot seriesPartitionSlot;
-  private final TTimePartitionSlot timePartitionSlot;
-  private final TConsensusGroupId dataRegionGroup;
-  private final int weight;
-
-  public DataPartitionEntry(
-      TSeriesPartitionSlot seriesPartitionSlot,
-      TTimePartitionSlot timePartitionSlot,
-      TConsensusGroupId dataRegionGroup) {
-    this.seriesPartitionSlot = seriesPartitionSlot;
-    this.timePartitionSlot = timePartitionSlot;
-    this.dataRegionGroup = dataRegionGroup;
-    this.weight = new SecureRandom().nextInt();
-  }
-
-  public TSeriesPartitionSlot getSeriesPartitionSlot() {
-    return seriesPartitionSlot;
-  }
-
-  public TTimePartitionSlot getTimePartitionSlot() {
-    return timePartitionSlot;
-  }
-
-  public TConsensusGroupId getDataRegionGroup() {
-    return dataRegionGroup;
-  }
-
-  @Override
-  public int compareTo(DataPartitionEntry o) {
-    // The timePartitionSlot will be in descending order
-    // After invoke Collections.sort()
-    if (!timePartitionSlot.equals(o.timePartitionSlot)) {
-      return o.timePartitionSlot.compareTo(timePartitionSlot);
-    }
-    return Integer.compare(weight, o.weight);
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    DataPartitionEntry that = (DataPartitionEntry) o;
-    return weight == that.weight
-        && seriesPartitionSlot.equals(that.seriesPartitionSlot)
-        && timePartitionSlot.equals(that.timePartitionSlot)
-        && dataRegionGroup.equals(that.dataRegionGroup);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(seriesPartitionSlot, timePartitionSlot, 
dataRegionGroup, weight);
-  }
-}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
index 09a49898635..1fe5ce1bcdf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/structure/BalanceTreeMap.java
@@ -26,14 +26,21 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.TreeMap;
 
+/**
+ * This class is used to store key-value pairs. It supports the following 
operations: 1. Put a
+ * key-value pair. 2. Get key with minimum value.
+ *
+ * @param <K> The type of Key
+ * @param <V> The type of Value, should be Comparable
+ */
 public class BalanceTreeMap<K, V extends Comparable<V>> {
 
   private final HashMap<K, V> keyValueMap;
-  private final TreeMap<V, Set<K>> valueKeyMap;
+  private final TreeMap<V, Set<K>> valueKeysMap;
 
   public BalanceTreeMap() {
     this.keyValueMap = new HashMap<>();
-    this.valueKeyMap = new TreeMap<>();
+    this.valueKeysMap = new TreeMap<>();
   }
 
   /**
@@ -43,19 +50,18 @@ public class BalanceTreeMap<K, V extends Comparable<V>> {
    * @param value Value
    */
   public void put(K key, V value) {
-    V oldValue = keyValueMap.getOrDefault(key, null);
-
     // Update keyValueMap
-    keyValueMap.put(key, value);
+    V oldValue = keyValueMap.put(key, value);
 
     // Update valueKeyMap
     if (oldValue != null) {
-      valueKeyMap.get(oldValue).remove(key);
-      if (valueKeyMap.get(oldValue).isEmpty()) {
-        valueKeyMap.remove(oldValue);
+      Set<K> keysSet = valueKeysMap.get(oldValue);
+      keysSet.remove(key);
+      if (keysSet.isEmpty()) {
+        valueKeysMap.remove(oldValue);
       }
     }
-    valueKeyMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key);
+    valueKeysMap.computeIfAbsent(value, empty -> new HashSet<>()).add(key);
   }
 
   /**
@@ -64,7 +70,7 @@ public class BalanceTreeMap<K, V extends Comparable<V>> {
    * @return Key with minimum value
    */
   public K getKeyWithMinValue() {
-    return valueKeyMap.firstEntry().getValue().iterator().next();
+    return valueKeysMap.firstEntry().getValue().iterator().next();
   }
 
   public V get(K key) {
@@ -81,18 +87,18 @@ public class BalanceTreeMap<K, V extends Comparable<V>> {
 
   @TestOnly
   public void remove(K key) {
-    V value = keyValueMap.getOrDefault(key, null);
+    V value = keyValueMap.remove(key);
     if (value != null) {
-      keyValueMap.remove(key);
-      valueKeyMap.get(value).remove(key);
-      if (valueKeyMap.get(value).isEmpty()) {
-        valueKeyMap.remove(value);
+      Set<K> keysSet = valueKeysMap.get(value);
+      keysSet.remove(key);
+      if (keysSet.isEmpty()) {
+        valueKeysMap.remove(value);
       }
     }
   }
 
   @TestOnly
   public boolean isEmpty() {
-    return keyValueMap.isEmpty() && valueKeyMap.isEmpty();
+    return keyValueMap.isEmpty() && valueKeysMap.isEmpty();
   }
 }
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java
deleted file mode 100644
index c92d4564b76..00000000000
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/partition/DataPartitionEntryTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.commons.partition;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class DataPartitionEntryTest {
-
-  private static final int SERIES_SLOT_NUM = 1000;
-  private static final long TIME_PARTITION_INTERVAL =
-      CommonDescriptor.getInstance().getConfig().getTimePartitionInterval();
-
-  @Test
-  public void testOrder() {
-    List<DataPartitionEntry> entries = new ArrayList<>();
-    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
-      entries.add(
-          new DataPartitionEntry(
-              new TSeriesPartitionSlot(i),
-              new TTimePartitionSlot(TIME_PARTITION_INTERVAL * i),
-              new TConsensusGroupId(TConsensusGroupType.DataRegion, i)));
-    }
-
-    List<DataPartitionEntry> sortedEntries = new ArrayList<>(entries);
-    Collections.sort(sortedEntries);
-    for (int i = 0; i < SERIES_SLOT_NUM; i++) {
-      Assert.assertEquals(entries.get(SERIES_SLOT_NUM - i - 1), 
sortedEntries.get(i));
-    }
-  }
-}

Reply via email to