chenhao7253886 commented on a change in pull request #336: First commit of new
tablet repair framework
URL: https://github.com/apache/incubator-doris/pull/336#discussion_r235583924
##########
File path: fe/src/main/java/org/apache/doris/clone/TabletFactory.java
##########
@@ -0,0 +1,655 @@
+package org.apache.doris.clone;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.Tablet.TabletStatus;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.Daemon;
+import org.apache.doris.persist.ReplicaPersistInfo;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.CloneTask;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/*
+ * TabletFactory saved the tablets produced by TabletScanner and try to repair
them.
+ * It also try to balance the cluster load if there is no tablet need to be
repaired.
+ *
+ * We are expecting to an efficient way to recovery the entire cluster and
make it balanced.
+ * Case 1:
+ * A Backend is down. All tablets which has replica on this BE should be
repaired as soon as possible.
+ *
+ * Case 1.1:
+ * As Backend is down, some tables should be repaired in high priority. So
the repair task should be able
+ * to preempted.
+ *
+ * Case 2:
+ * A new Backend is added to the cluster. Replicas should be transfer to that
host to balance the cluster load.
+ * 1 sec
+ */
+public class TabletFactory extends Daemon {
+ private static final Logger LOG =
LogManager.getLogger(TabletFactory.class);
+
+ private PriorityBlockingQueue<TabletInfo> q = new
PriorityBlockingQueue<>();;
+ private Set<Long> runningTabletIds = Sets.newHashSet();;
+
+ /*
+ * this is a init #working slot per backend.
+ * We will increase or decrease the slot num dynamically based on the
clone task statistic info
+ */
+ private static int BATCH_NUM = 10; // handle at most BATCH_NUM tablets in
one loop
+
+ private static int SLOT_NUM_PER_PATH = 1;
+
+ // be id -> #working slots
+ private Map<Long, Slot> backendsWorkingSlots = Maps.newConcurrentMap();;
+
+ private Catalog catalog;
+ private SystemInfoService infoService;
+ private TabletInvertedIndex invertedIndex;
+
+ // cluster name -> load statistic
+ private Map<String, ClusterLoadStatistic> statisticMap =
Maps.newConcurrentMap();
+ private long lastLoadStatisticUpdateTime = 0;
+ private static long UPDATE_INTERVAL_MS = 60 * 1000;
+
+ private boolean isInit = false;
+
+ public TabletFactory() {
+ super("tablet factory", 1000);
+ catalog = Catalog.getCurrentCatalog();
+ infoService = Catalog.getCurrentSystemInfo();
+ invertedIndex = Catalog.getCurrentInvertedIndex();
+ }
+
+ public boolean init() {
+ return initWorkingSlots();
+ }
+
+ private boolean initWorkingSlots() {
+ ImmutableMap<Long, Backend> backends =
infoService.getBackendsInCluster(null);
+ for (Backend be : backends.values()) {
+ List<Long> pathHashes = be.getDisks().values().stream().map(v ->
v.getPathHash()).collect(Collectors.toList());
+ Slot slot = new Slot(pathHashes, SLOT_NUM_PER_PATH);
+ backendsWorkingSlots.put(be.getId(), slot);
+ LOG.info("init backend {} working slots: {}", be.getId(),
be.getDisks().size());
+ }
+
+ // TODO(cmy): the path hash in DiskInfo may not be available before
the entire cluster being upgraded
+ // to the latest version.
+ // So we have to wait until we get all path hash info.
+ return false;
+ }
+
+ public void updateWorkingSlots() {
+ ImmutableMap<Long, Backend> backends =
infoService.getBackendsInCluster(null);
+ Set<Long> deletedBeIds = Sets.newHashSet();
+
+ // update exist backends
+ for (Long beId : backendsWorkingSlots.keySet()) {
+ if (backends.containsKey(beId)) {
+ List<Long> pathHashes =
backends.get(beId).getDisks().values().stream().map(v ->
v.getPathHash()).collect(Collectors.toList());
+ backendsWorkingSlots.get(beId).updateSlots(pathHashes);
+ } else {
+ deletedBeIds.add(beId);
+ }
+ }
+
+ // delete non-exist backends
+ for (Long beId : deletedBeIds) {
+ backendsWorkingSlots.remove(beId);
+ }
+
+ // add new backends
+ for (Backend be : backends.values()) {
+ if (!backendsWorkingSlots.containsKey(be.getId())) {
+ List<Long> pathHashes = be.getDisks().values().stream().map(v
-> v.getPathHash()).collect(Collectors.toList());
+ Slot slot = new Slot(pathHashes, SLOT_NUM_PER_PATH);
+ backendsWorkingSlots.put(be.getId(), slot);
+ LOG.info("init backend {} working slots: {}", be.getId(),
be.getDisks().size());
+ }
+ }
+ }
+
+ public void addNewBackend(long newBackendId) {
+ // backendsWorkingSlots.putIfAbsent(newBackendId,
INIT_BACKEND_WORKING_SLOT);
+ }
+
+ public void deleteBackend(long backendId) {
+ backendsWorkingSlots.remove(backendId);
+ }
+
+ public Map<Long, Slot> getBackendsWorkingSlots() {
+ return backendsWorkingSlots;
+ }
+
+ public synchronized boolean addTablet(TabletInfo tablet) {
+ if (runningTabletIds.contains(tablet.getTabletId())) {
+ return false;
+ }
+ runningTabletIds.add(tablet.getTabletId());
+ q.offer(tablet);
+ return true;
+ }
+
+ public boolean isEmpty() {
+ return q.isEmpty();
+ }
+
+ /*
+ * TabletFactory will run as a daemon thread at a very short
interval(default 1 sec)
+ * It will try to repair the tablet in queue, and try to balance the
cluster if possible.
+ */
+ @Override
+ protected void runOneCycle() {
+ if (!isInit && !init()) {
+ // not ready to start.
+ return;
+ }
+
+ updateClusterLoadStatisticsIfNecessary();
+
+ handleQ();
+
Review comment:
Q?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]