This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new dff2f5f05a3 [improvement](disk balance) Impr disk rebalancer sched
when be load rebalancer idle #26412 (#28945)
dff2f5f05a3 is described below
commit dff2f5f05a3da880eb3471d6ea4d4867a85dbc4f
Author: deardeng <[email protected]>
AuthorDate: Thu Dec 28 10:19:21 2023 +0800
[improvement](disk balance) Impr disk rebalancer sched when be load
rebalancer idle #26412 (#28945)
---
.../java/org/apache/doris/catalog/Replica.java | 5 +
.../org/apache/doris/clone/BeLoadRebalancer.java | 1 +
.../org/apache/doris/clone/DiskRebalancer.java | 20 +--
.../apache/doris/clone/PartitionRebalancer.java | 12 --
.../java/org/apache/doris/clone/Rebalancer.java | 28 +++-
.../org/apache/doris/clone/TabletScheduler.java | 3 +
.../org/apache/doris/catalog/CatalogTestUtil.java | 46 +++++++
.../doris/clone/DiskReblanceWhenSchedulerIdle.java | 149 +++++++++++++++++++++
.../apache/doris/utframe/MockedBackendFactory.java | 37 ++++-
9 files changed, 276 insertions(+), 25 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index 8e1b3e75771..95803749034 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -209,6 +209,11 @@ public class Replica implements Writable {
return this.backendId;
}
+ // just for ut
+ public void setBackendId(long backendId) {
+ this.backendId = backendId;
+ }
+
public long getDataSize() {
return dataSize;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 4e52024c7bc..93d884af5ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -370,4 +370,5 @@ public class BeLoadRebalancer extends Rebalancer {
throw new SchedException(Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"unable to find low backend");
}
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index 42d2ab1ba39..bec896c5dfb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -26,7 +26,6 @@ import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.TabletSchedCtx.BalanceType;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
-import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
@@ -42,7 +41,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
/*
* This DiskBalancer is different from other Balancers which takes care of
cluster-wide data balancing.
@@ -125,14 +123,16 @@ public class DiskRebalancer extends Rebalancer {
List<BackendLoadStatistic> highBEs = Lists.newArrayList();
clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs,
medium);
- if (Config.tablet_rebalancer_type.equalsIgnoreCase("partition")) {
- PartitionRebalancer rebalancer = (PartitionRebalancer)
Env.getCurrentEnv()
- .getTabletScheduler().getRebalancer();
- if (rebalancer != null && rebalancer.checkCacheEmptyForLong()) {
- midBEs.addAll(lowBEs);
- midBEs.addAll(highBEs);
- }
- } else if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
+ Rebalancer rebalancer = FeConstants.runningUnitTest ? null
+ : Env.getCurrentEnv().getTabletScheduler().getRebalancer();
+ if (rebalancer != null &&
rebalancer.unPickOverLongTime(clusterStat.getTag(), medium)) {
+ midBEs.addAll(lowBEs);
+ midBEs.addAll(highBEs);
+ lowBEs.clear();
+ highBEs.clear();
+ }
+
+ if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
// the cluster is not balanced
if (prioBackends.isEmpty()) {
LOG.info("cluster is not balanced with medium: {}. skip",
medium);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index e8917eabf43..a6b8bf04b12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -64,8 +64,6 @@ public class PartitionRebalancer extends Rebalancer {
private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
- private long cacheEmptyTimestamp = -1L;
-
public PartitionRebalancer(SystemInfoService infoService,
TabletInvertedIndex invertedIndex,
Map<Long, PathSlot> backendsWorkingSlots) {
super(infoService, invertedIndex, backendsWorkingSlots);
@@ -232,11 +230,6 @@ public class PartitionRebalancer extends Rebalancer {
return !bes.contains(move.fromBe) && bes.contains(move.toBe);
}
- // cache empty for 10 min
- public boolean checkCacheEmptyForLong() {
- return cacheEmptyTimestamp > 0 && System.currentTimeMillis() >
cacheEmptyTimestamp + 10 * 60 * 1000L;
- }
-
@Override
protected void completeSchedCtx(TabletSchedCtx tabletCtx)
throws SchedException {
@@ -329,11 +322,6 @@ public class PartitionRebalancer extends Rebalancer {
movesCacheMap.updateMapping(statisticMap,
Config.partition_rebalance_move_expire_after_access);
// Perform cache maintenance
movesCacheMap.maintain();
- if (movesCacheMap.size() > 0) {
- cacheEmptyTimestamp = -1;
- } else if (cacheEmptyTimestamp < 0) {
- cacheEmptyTimestamp = System.currentTimeMillis();
- }
LOG.debug("Move succeeded/total :{}/{}, current {}",
counterBalanceMoveSucceeded.get(),
counterBalanceMoveCreated.get(), movesCacheMap);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index f339418055b..8f6b1d229f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -25,8 +25,12 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentTask;
import org.apache.doris.thrift.TStorageMedium;
+import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
@@ -46,6 +50,7 @@ import java.util.Map;
* 2. If you want to make sure the move is succeed, you can assume that it's
succeed when getToDeleteReplicaId called.
*/
public abstract class Rebalancer {
+ private static final Logger LOG = LogManager.getLogger(Rebalancer.class);
// When Rebalancer init, the statisticMap is usually empty. So it's no
need to be an arg.
// Only use updateLoadStatistic() to load stats.
protected Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap();
@@ -55,6 +60,14 @@ public abstract class Rebalancer {
// be id -> end time of prio
protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
+ // tag -> (medium, timestamp)
+ private Table<Tag, TStorageMedium, Long> lastPickTimeTable =
HashBasedTable.create();
+
+ // for ut
+ public Table<Tag, TStorageMedium, Long> getLastPickTimeTable() {
+ return lastPickTimeTable;
+ }
+
public Rebalancer(SystemInfoService infoService, TabletInvertedIndex
invertedIndex,
Map<Long, PathSlot> backendsWorkingSlots) {
this.infoService = infoService;
@@ -66,7 +79,12 @@ public abstract class Rebalancer {
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
for (Map.Entry<Tag, LoadStatisticForTag> entry :
statisticMap.entrySet()) {
for (TStorageMedium medium : TStorageMedium.values()) {
-
alternativeTablets.addAll(selectAlternativeTabletsForCluster(entry.getValue(),
medium));
+ List<TabletSchedCtx> candidates =
+ selectAlternativeTabletsForCluster(entry.getValue(),
medium);
+ alternativeTablets.addAll(candidates);
+ if (!candidates.isEmpty()) {
+ lastPickTimeTable.put(entry.getKey(), medium,
System.currentTimeMillis());
+ }
}
}
return alternativeTablets;
@@ -77,6 +95,14 @@ public abstract class Rebalancer {
protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
LoadStatisticForTag clusterStat, TStorageMedium medium);
+ // 5mins
+ protected boolean unPickOverLongTime(Tag tag, TStorageMedium medium) {
+ Long lastPickTime = lastPickTimeTable.get(tag, medium);
+ Long now = System.currentTimeMillis();
+ LOG.debug("tag={}, medium={}, lastPickTime={}, now={}", tag, medium,
lastPickTime, now);
+ return lastPickTime == null || now - lastPickTime >= 5 * 60 * 1000L;
+ }
+
public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
throws SchedException {
completeSchedCtx(tabletCtx);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 80aa8da0f71..4d84959155d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -171,6 +171,7 @@ public class TabletScheduler extends MasterDaemon {
return stat;
}
+ // just return be or partition rebalancer
public Rebalancer getRebalancer() {
return rebalancer;
}
@@ -274,6 +275,8 @@ public class TabletScheduler extends MasterDaemon {
return AddResult.ADDED;
}
+
+
public synchronized boolean containsTablet(long tabletId) {
return allTabletTypes.containsKey(tabletId);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index a05c63b812f..d1cdeba0e3f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -374,4 +374,50 @@ public class CatalogTestUtil {
olapTable.readUnlock();
}
}
+
+ public static long getReplicaPathHash(long tabletId, long backendId) {
+ Env env = Env.getCurrentEnv();
+ TabletInvertedIndex invertedIndex = env.getTabletInvertedIndex();
+ TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
+ if (tabletMeta == null) {
+ return -1L;
+ }
+
+ long dbId = tabletMeta.getDbId();
+ long tableId = tabletMeta.getTableId();
+ long partitionId = tabletMeta.getPartitionId();
+ long indexId = tabletMeta.getIndexId();
+ Database db = env.getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ return -1L;
+ }
+ Table table = db.getTableNullable(tableId);
+ if (table == null) {
+ return -1L;
+ }
+ if (table.getType() != Table.TableType.OLAP) {
+ return -1L;
+ }
+ OlapTable olapTable = (OlapTable) table;
+ olapTable.readLock();
+ try {
+ Partition partition = olapTable.getPartition(partitionId);
+ if (partition == null) {
+ return -1L;
+ }
+ MaterializedIndex materializedIndex = partition.getIndex(indexId);
+ if (materializedIndex == null) {
+ return -1L;
+ }
+ Tablet tablet = materializedIndex.getTablet(tabletId);
+ for (Replica replica : tablet.getReplicas()) {
+ if (replica.getBackendId() == backendId) {
+ return replica.getPathHash();
+ }
+ }
+ } finally {
+ olapTable.readUnlock();
+ }
+ return -1;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
new file mode 100644
index 00000000000..8d65bffd10b
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskReblanceWhenSchedulerIdle.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class DiskReblanceWhenSchedulerIdle extends TestWithFeService {
+
+ @Override
+ protected void beforeCreatingConnectContext() throws Exception {
+ Config.enable_round_robin_create_tablet = true;
+ Config.allow_replica_on_same_host = true;
+ Config.schedule_slot_num_per_hdd_path = 1;
+ Config.disable_balance = true;
+ Config.enable_debug_points = true;
+ }
+
+ @Override
+ protected int backendNum() {
+ return 2;
+ }
+
+ @Test
+ public void testDiskReblanceWhenSchedulerIdle() throws Exception {
+ // case start
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
+ Assertions.assertEquals(backendNum(), backends.size());
+ for (Backend be : backends) {
+ Assertions.assertEquals(0,
invertedIndex.getTabletNumByBackendId(be.getId()));
+ }
+
+ long totalCapacity = 10L << 30;
+
+ for (int i = 0; i < backends.size(); i++) {
+ Map<String, DiskInfo> disks = Maps.newHashMap();
+ for (int j = 0; j < 2; j++) {
+ DiskInfo diskInfo = new DiskInfo("be_" + i + "_disk_" + j);
+ diskInfo.setTotalCapacityB(totalCapacity);
+ diskInfo.setDataUsedCapacityB(1L << 30);
+ diskInfo.setAvailableCapacityB(9L << 30);
+ diskInfo.setPathHash((1000L * (i + 1)) + 10 * j);
+ disks.put(diskInfo.getRootPath(), diskInfo);
+ }
+ backends.get(i).setDisks(ImmutableMap.copyOf(disks));
+ }
+ Backend be0 = backends.get(0);
+
+ createDatabase("test");
+ createTable("CREATE TABLE test.tbl1 (k INT) DISTRIBUTED BY HASH(k) "
+ + " BUCKETS 4 PROPERTIES ( \"replication_num\" = \"1\","
+ + " \"storage_medium\" = \"HDD\")");
+
+ Assertions.assertEquals(2,
invertedIndex.getTabletNumByBackendId(backends.get(0).getId()));
+ Assertions.assertEquals(2,
invertedIndex.getTabletNumByBackendId(backends.get(1).getId()));
+
+
+ Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test");
+ OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
+ Assertions.assertNotNull(tbl);
+ Partition partition = tbl.getPartitions().iterator().next();
+ List<Tablet> tablets =
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+ .getTablets();
+
+ DiskInfo diskInfo0 = be0.getDisks().values().asList().get(0);
+ DiskInfo diskInfo1 = be0.getDisks().values().asList().get(1);
+
+ tablets.forEach(tablet -> {
+ Lists.newArrayList(tablet.getReplicas()).forEach(
+ replica -> {
+ if (replica.getBackendId() == backends.get(1).getId()) {
+ replica.updateStat(totalCapacity / 4, 1);
+ tablet.deleteReplica(replica);
+ replica.setBackendId(backends.get(0).getId());
+ replica.setPathHash(diskInfo0.getPathHash());
+ tablet.addReplica(replica);
+ } else {
+ replica.setPathHash(diskInfo0.getPathHash());
+ }
+ }
+ );
+ });
+
+ diskInfo0.setAvailableCapacityB(0L);
+ diskInfo1.setAvailableCapacityB(totalCapacity);
+ DebugPointUtil.addDebugPointWithValue("FE.HIGH_LOAD_BE_ID",
backends.get(1).getId());
+
+ Table<Tag, TStorageMedium, Long> lastPickTimeTable =
Env.getCurrentEnv().getTabletScheduler().getRebalancer().getLastPickTimeTable();
+ lastPickTimeTable.put(Tag.DEFAULT_BACKEND_TAG, TStorageMedium.HDD, 0L);
+ Config.disable_balance = false;
+
+
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ Map<Long, Integer> gotDiskTabletNums = Maps.newHashMap();
+ tablets.forEach(tablet -> tablet.getReplicas().forEach(replica -> {
+ gotDiskTabletNums.put(replica.getPathHash(), 1 +
gotDiskTabletNums.getOrDefault(replica.getPathHash(), 0));
+ }));
+
+
+ Map<Long, Integer> expectTabletNums = Maps.newHashMap();
+ expectTabletNums.put(diskInfo0.getPathHash(), 2);
+ expectTabletNums.put(diskInfo1.getPathHash(), 2);
+
+ Assertions.assertEquals(expectTabletNums, gotDiskTabletNums);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index a69b4a81cab..9d6f06a2266 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -19,6 +19,7 @@ package org.apache.doris.utframe;
import org.apache.doris.catalog.CatalogTestUtil;
import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.DiskInfo.DiskState;
import org.apache.doris.common.ClientPool;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
@@ -26,7 +27,7 @@ import org.apache.doris.proto.PBackendServiceGrpc;
import org.apache.doris.proto.Types;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
-import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.FrontendService.Client;
import org.apache.doris.thrift.HeartbeatService;
import org.apache.doris.thrift.TAgentPublishRequest;
import org.apache.doris.thrift.TAgentResult;
@@ -62,6 +63,7 @@ import org.apache.doris.thrift.TScanOpenResult;
import org.apache.doris.thrift.TSnapshotRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TStorageMediumMigrateReq;
import org.apache.doris.thrift.TStreamLoadRecordResult;
import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TTabletStatResult;
@@ -164,7 +166,7 @@ public class MockedBackendFactory {
public void run() {
while (true) {
boolean ok = false;
- FrontendService.Client client = null;
+ Client client = null;
TNetworkAddress address = null;
try {
// ATTR: backend.getFeAddress must after
taskQueue.take, because fe addr thread race
@@ -190,6 +192,9 @@ public class MockedBackendFactory {
case CLONE:
handleCloneTablet(request,
finishTaskRequest);
break;
+ case STORAGE_MEDIUM_MIGRATE:
+ handleStorageMediumMigrate(request,
finishTaskRequest);
+ break;
default:
break;
}
@@ -245,6 +250,34 @@ public class MockedBackendFactory {
finishTaskRequest.setFinishTabletInfos(tabletInfos);
}
+ private void handleStorageMediumMigrate(TAgentTaskRequest
request, TFinishTaskRequest finishTaskRequest) {
+ TStorageMediumMigrateReq req =
request.getStorageMediumMigrateReq();
+ long dataSize = Math.max(1,
CatalogTestUtil.getTabletDataSize(req.tablet_id));
+
+ long srcDataPath =
CatalogTestUtil.getReplicaPathHash(req.tablet_id, backendInFe.getId());
+ DiskInfo srcDiskInfo = getDisk(srcDataPath);
+ if (srcDiskInfo != null) {
+
srcDiskInfo.setDataUsedCapacityB(Math.min(srcDiskInfo.getTotalCapacityB(),
+ srcDiskInfo.getDataUsedCapacityB() -
dataSize));
+ srcDiskInfo.setAvailableCapacityB(Math.max(0L,
+ srcDiskInfo.getAvailableCapacityB() +
dataSize));
+ srcDiskInfo.setState(DiskState.ONLINE);
+ }
+
+ DiskInfo destDiskInfo = getDisk(req.data_dir);
+ if (destDiskInfo != null) {
+
destDiskInfo.setDataUsedCapacityB(Math.min(destDiskInfo.getTotalCapacityB(),
+ destDiskInfo.getDataUsedCapacityB() +
dataSize));
+ destDiskInfo.setAvailableCapacityB(Math.max(0L,
+ destDiskInfo.getAvailableCapacityB() -
dataSize));
+ destDiskInfo.setState(DiskState.ONLINE);
+ }
+ }
+
+ private DiskInfo getDisk(String dataDir) {
+ return backendInFe.getDisks().get(dataDir);
+ }
+
private DiskInfo getDisk(long pathHash) {
DiskInfo diskInfo = null;
for (DiskInfo tmpDiskInfo :
backendInFe.getDisks().values()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]