Repository: hbase Updated Branches: refs/heads/branch-2.1 9d34b4581 -> 5a300f3fc
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java new file mode 100644 index 0000000..18d7823 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureMap.java @@ -0,0 +1,607 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store.wal; + +import java.io.IOException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * We keep an in-memory map of the procedures sorted by replay order. (see the details in the + * beginning of {@link ProcedureWALFormatReader}). + * + * <pre> + * procedureMap = | A | | E | | C | | | | | G | | | + * D B + * replayOrderHead = C <-> B <-> E <-> D <-> A <-> G + * + * We also have a lazy grouping by "root procedure", and a list of + * unlinked procedures. If after reading all the WALs we have unlinked + * procedures it means that we had a missing WAL or a corruption. + * rootHead = A <-> D <-> G + * B E + * C + * unlinkFromLinkList = None + * </pre> + */ +class WALProcedureMap { + + private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class); + + private static class Entry { + // For bucketed linked lists in hash-table. + private Entry hashNext; + // child head + private Entry childHead; + // double-link for rootHead or childHead + private Entry linkNext; + private Entry linkPrev; + // replay double-linked-list + private Entry replayNext; + private Entry replayPrev; + // procedure-infos + private Procedure<?> procedure; + private ProcedureProtos.Procedure proto; + private boolean ready = false; + + public Entry(Entry hashNext) { + this.hashNext = hashNext; + } + + public long getProcId() { + return proto.getProcId(); + } + + public long getParentId() { + return proto.getParentId(); + } + + public boolean hasParent() { + return proto.hasParentId(); + } + + public boolean isReady() { + return ready; + } + + public boolean isFinished() { + if (!hasParent()) { + // we only consider 'root' procedures. because for the user 'finished' + // means when everything up to the 'root' is finished. + switch (proto.getState()) { + case ROLLEDBACK: + case SUCCESS: + return true; + default: + break; + } + } + return false; + } + + public Procedure<?> convert() throws IOException { + if (procedure == null) { + procedure = ProcedureUtil.convertToProcedure(proto); + } + return procedure; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("Entry("); + sb.append(getProcId()); + sb.append(", parentId="); + sb.append(getParentId()); + sb.append(", class="); + sb.append(proto.getClassName()); + sb.append(")"); + return sb.toString(); + } + } + + private static class EntryIterator implements ProcedureIterator { + private final Entry replayHead; + private Entry current; + + public EntryIterator(Entry replayHead) { + this.replayHead = replayHead; + this.current = replayHead; + } + + @Override + public void reset() { + this.current = replayHead; + } + + @Override + public boolean hasNext() { + return current != null; + } + + @Override + public boolean isNextFinished() { + return current != null && current.isFinished(); + } + + @Override + public void skipNext() { + current = current.replayNext; + } + + @Override + public Procedure<?> next() throws IOException { + try { + return current.convert(); + } finally { + current = current.replayNext; + } + } + } + + // procedure hash table + private Entry[] procedureMap; + + // replay-order double-linked-list + private Entry replayOrderHead; + private Entry replayOrderTail; + + // root linked-list + private Entry rootHead; + + // pending unlinked children (root not present yet) + private Entry childUnlinkedHead; + + // Track ProcId range + private long minModifiedProcId = Long.MAX_VALUE; + private long maxModifiedProcId = Long.MIN_VALUE; + + public WALProcedureMap(int size) { + procedureMap = new Entry[size]; + replayOrderHead = null; + replayOrderTail = null; + rootHead = null; + childUnlinkedHead = null; + } + + public void add(ProcedureProtos.Procedure procProto) { + trackProcIds(procProto.getProcId()); + Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId()); + boolean newEntry = entry.proto == null; + // We have seen procedure WALs where the entries are out of order; see HBASE-18152. + // To compensate, only replace the Entry procedure if for sure this new procedure + // is indeed an entry that came later. + // TODO: Fix the writing of procedure info so it does not violate basic expectation, that WALs + // contain procedure changes goingfrom start to finish in sequence. + if (newEntry || isIncreasing(entry.proto, procProto)) { + entry.proto = procProto; + } + addToReplayList(entry); + if (newEntry) { + if (procProto.hasParentId()) { + childUnlinkedHead = addToLinkList(entry, childUnlinkedHead); + } else { + rootHead = addToLinkList(entry, rootHead); + } + } + } + + /** + * @return True if this new procedure is 'richer' than the current one else false and we log this + * incidence where it appears that the WAL has older entries appended after newer ones. + * See HBASE-18152. + */ + private static boolean isIncreasing(ProcedureProtos.Procedure current, + ProcedureProtos.Procedure candidate) { + // Check that the procedures we see are 'increasing'. We used to compare + // procedure id first and then update time but it can legitimately go backwards if the + // procedure is failed or rolled back so that was unreliable. Was going to compare + // state but lets see if comparing update time enough (unfortunately this issue only + // seen under load...) + boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate(); + if (!increasing) { + LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate); + } + return increasing; + } + + public boolean remove(long procId) { + trackProcIds(procId); + Entry entry = removeFromMap(procId); + if (entry != null) { + unlinkFromReplayList(entry); + unlinkFromLinkList(entry); + return true; + } + return false; + } + + private void trackProcIds(long procId) { + minModifiedProcId = Math.min(minModifiedProcId, procId); + maxModifiedProcId = Math.max(maxModifiedProcId, procId); + } + + public long getMinModifiedProcId() { + return minModifiedProcId; + } + + public long getMaxModifiedProcId() { + return maxModifiedProcId; + } + + public boolean contains(long procId) { + return getProcedure(procId) != null; + } + + public boolean isEmpty() { + return replayOrderHead == null; + } + + public void clear() { + for (int i = 0; i < procedureMap.length; ++i) { + procedureMap[i] = null; + } + replayOrderHead = null; + replayOrderTail = null; + rootHead = null; + childUnlinkedHead = null; + minModifiedProcId = Long.MAX_VALUE; + maxModifiedProcId = Long.MIN_VALUE; + } + + /* + * Merges two WalProcedureMap, the target is the "global" map, the source is the "local" map. - + * The entries in the hashtables are guaranteed to be unique. On replay we don't load procedures + * that already exist in the "global" map (the one we are merging the "local" in to). - The + * replayOrderList of the "local" nao will be appended to the "global" map replay list. - The + * "local" map will be cleared at the end of the operation. + */ + public void mergeTail(WALProcedureMap other) { + for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) { + int slotIndex = getMapSlot(p.getProcId()); + p.hashNext = procedureMap[slotIndex]; + procedureMap[slotIndex] = p; + } + + if (replayOrderHead == null) { + replayOrderHead = other.replayOrderHead; + replayOrderTail = other.replayOrderTail; + rootHead = other.rootHead; + childUnlinkedHead = other.childUnlinkedHead; + } else { + // append replay list + assert replayOrderTail.replayNext == null; + assert other.replayOrderHead.replayPrev == null; + replayOrderTail.replayNext = other.replayOrderHead; + other.replayOrderHead.replayPrev = replayOrderTail; + replayOrderTail = other.replayOrderTail; + + // merge rootHead + if (rootHead == null) { + rootHead = other.rootHead; + } else if (other.rootHead != null) { + Entry otherTail = findLinkListTail(other.rootHead); + otherTail.linkNext = rootHead; + rootHead.linkPrev = otherTail; + rootHead = other.rootHead; + } + + // merge childUnlinkedHead + if (childUnlinkedHead == null) { + childUnlinkedHead = other.childUnlinkedHead; + } else if (other.childUnlinkedHead != null) { + Entry otherTail = findLinkListTail(other.childUnlinkedHead); + otherTail.linkNext = childUnlinkedHead; + childUnlinkedHead.linkPrev = otherTail; + childUnlinkedHead = other.childUnlinkedHead; + } + } + maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId); + minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId); + + other.clear(); + } + + /** + * Returns an EntryIterator with the list of procedures ready to be added to the executor. A + * Procedure is ready if its children and parent are ready. + */ + public ProcedureIterator fetchReady() { + buildGraph(); + + Entry readyHead = null; + Entry readyTail = null; + Entry p = replayOrderHead; + while (p != null) { + Entry next = p.replayNext; + if (p.isReady()) { + unlinkFromReplayList(p); + if (readyTail != null) { + readyTail.replayNext = p; + p.replayPrev = readyTail; + } else { + p.replayPrev = null; + readyHead = p; + } + readyTail = p; + p.replayNext = null; + } + p = next; + } + // we need the hash-table lookups for parents, so this must be done + // out of the loop where we check isReadyToRun() + for (p = readyHead; p != null; p = p.replayNext) { + removeFromMap(p.getProcId()); + unlinkFromLinkList(p); + } + return readyHead != null ? new EntryIterator(readyHead) : null; + } + + /** + * Drain this map and return all procedures in it. + */ + public ProcedureIterator fetchAll() { + Entry head = replayOrderHead; + for (Entry p = head; p != null; p = p.replayNext) { + removeFromMap(p.getProcId()); + } + for (int i = 0; i < procedureMap.length; ++i) { + assert procedureMap[i] == null : "map not empty i=" + i; + } + replayOrderHead = null; + replayOrderTail = null; + childUnlinkedHead = null; + rootHead = null; + return head != null ? new EntryIterator(head) : null; + } + + private void buildGraph() { + Entry p = childUnlinkedHead; + while (p != null) { + Entry next = p.linkNext; + Entry rootProc = getRootProcedure(p); + if (rootProc != null) { + rootProc.childHead = addToLinkList(p, rootProc.childHead); + } + p = next; + } + + for (p = rootHead; p != null; p = p.linkNext) { + checkReadyToRun(p); + } + } + + private Entry getRootProcedure(Entry entry) { + while (entry != null && entry.hasParent()) { + entry = getProcedure(entry.getParentId()); + } + return entry; + } + + /** + * (see the comprehensive explanation in the beginning of {@link ProcedureWALFormatReader}). A + * Procedure is ready when parent and children are ready. "ready" means that we all the + * information that we need in-memory. + * <p/> + * Example-1:<br/> + * We have two WALs, we start reading from the newest (wal-2) + * + * <pre> + * wal-2 | C B | + * wal-1 | A B C | + * </pre> + * + * If C and B don't depend on A (A is not the parent), we can start them before reading wal-1. If + * B is the only one with parent A we can start C. We have to read one more WAL before being able + * to start B. + * <p/> + * How do we know with the only information in B that we are not ready. + * <ul> + * <li>easy case, the parent is missing from the global map</li> + * <li>more complex case we look at the Stack IDs.</li> + * </ul> + * The Stack-IDs are added to the procedure order as an incremental index tracking how many times + * that procedure was executed, which is equivalent to the number of times we wrote the procedure + * to the WAL. <br/> + * In the example above: + * + * <pre> + * wal-2: B has stackId = [1, 2] + * wal-1: B has stackId = [1] + * wal-1: A has stackId = [0] + * </pre> + * + * Since we know that the Stack-IDs are incremental for a Procedure, we notice that there is a gap + * in the stackIds of B, so something was executed before. + * <p/> + * To identify when a Procedure is ready we do the sum of the stackIds of the procedure and the + * parent. if the stackIdSum is equal to the sum of {1..maxStackId} then everything we need is + * available. + * <p/> + * Example-2 + * + * <pre> + * wal-2 | A | A stackIds = [0, 2] + * wal-1 | A B | B stackIds = [1] + * </pre> + * + * There is a gap between A stackIds so something was executed in between. + */ + private boolean checkReadyToRun(Entry rootEntry) { + assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry; + + if (rootEntry.isFinished()) { + // If the root procedure is finished, sub-procedures should be gone + if (rootEntry.childHead != null) { + LOG.error("unexpected active children for root-procedure: {}", rootEntry); + for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { + LOG.error("unexpected active children: {}", p); + } + } + + assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry; + rootEntry.ready = true; + return true; + } + + int stackIdSum = 0; + int maxStackId = 0; + for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) { + int stackId = 1 + rootEntry.proto.getStackId(i); + maxStackId = Math.max(maxStackId, stackId); + stackIdSum += stackId; + LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, + rootEntry); + } + + for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { + for (int i = 0; i < p.proto.getStackIdCount(); ++i) { + int stackId = 1 + p.proto.getStackId(i); + maxStackId = Math.max(maxStackId, stackId); + stackIdSum += stackId; + LOG.trace("stackId={} stackIdSum={} maxStackid={} {}", stackId, stackIdSum, maxStackId, p); + } + } + // The cmpStackIdSum is this formula for finding the sum of a series of numbers: + // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg + final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2); + if (cmpStackIdSum == stackIdSum) { + rootEntry.ready = true; + for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { + p.ready = true; + } + return true; + } + return false; + } + + private void unlinkFromReplayList(Entry entry) { + if (replayOrderHead == entry) { + replayOrderHead = entry.replayNext; + } + if (replayOrderTail == entry) { + replayOrderTail = entry.replayPrev; + } + if (entry.replayPrev != null) { + entry.replayPrev.replayNext = entry.replayNext; + } + if (entry.replayNext != null) { + entry.replayNext.replayPrev = entry.replayPrev; + } + } + + private void addToReplayList(final Entry entry) { + unlinkFromReplayList(entry); + entry.replayNext = replayOrderHead; + entry.replayPrev = null; + if (replayOrderHead != null) { + replayOrderHead.replayPrev = entry; + } else { + replayOrderTail = entry; + } + replayOrderHead = entry; + } + + private void unlinkFromLinkList(Entry entry) { + if (entry == rootHead) { + rootHead = entry.linkNext; + } else if (entry == childUnlinkedHead) { + childUnlinkedHead = entry.linkNext; + } + if (entry.linkPrev != null) { + entry.linkPrev.linkNext = entry.linkNext; + } + if (entry.linkNext != null) { + entry.linkNext.linkPrev = entry.linkPrev; + } + } + + private Entry addToLinkList(Entry entry, Entry linkHead) { + unlinkFromLinkList(entry); + entry.linkNext = linkHead; + entry.linkPrev = null; + if (linkHead != null) { + linkHead.linkPrev = entry; + } + return entry; + } + + private Entry findLinkListTail(Entry linkHead) { + Entry tail = linkHead; + while (tail.linkNext != null) { + tail = tail.linkNext; + } + return tail; + } + + private Entry addToMap(long procId, boolean hasParent) { + int slotIndex = getMapSlot(procId); + Entry entry = getProcedure(slotIndex, procId); + if (entry != null) { + return entry; + } + + entry = new Entry(procedureMap[slotIndex]); + procedureMap[slotIndex] = entry; + return entry; + } + + private Entry removeFromMap(final long procId) { + int slotIndex = getMapSlot(procId); + Entry prev = null; + Entry entry = procedureMap[slotIndex]; + while (entry != null) { + if (procId == entry.getProcId()) { + if (prev != null) { + prev.hashNext = entry.hashNext; + } else { + procedureMap[slotIndex] = entry.hashNext; + } + entry.hashNext = null; + return entry; + } + prev = entry; + entry = entry.hashNext; + } + return null; + } + + private Entry getProcedure(long procId) { + return getProcedure(getMapSlot(procId), procId); + } + + private Entry getProcedure(int slotIndex, long procId) { + Entry entry = procedureMap[slotIndex]; + while (entry != null) { + if (procId == entry.getProcId()) { + return entry; + } + entry = entry.hashNext; + } + return null; + } + + private int getMapSlot(long procId) { + return (int) (Procedure.getProcIdHashCode(procId) % procedureMap.length); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 951f05e..b3f5d10 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -15,14 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.Iterator; @@ -35,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -52,25 +49,60 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.org.apache.commons.collections4.queue.CircularFifoQueue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; /** * WAL implementation of the ProcedureStore. + * <p/> + * When starting, the upper layer will first call {@link #start(int)}, then {@link #recoverLease()}, + * then {@link #load(ProcedureLoader)}. + * <p/> + * In {@link #recoverLease()}, we will get the lease by closing all the existing wal files(by + * calling recoverFileLease), and creating a new wal writer. And we will also get the list of all + * the old wal files. + * <p/> + * FIXME: notice that the current recover lease implementation is problematic, it can not deal with + * the races if there are two master both wants to acquire the lease... + * <p/> + * In {@link #load(ProcedureLoader)} method, we will load all the active procedures. See the + * comments of this method for more details. + * <p/> + * The actual logging way is a bit like our FileSystem based WAL implementation as RS side. There is + * a {@link #slots}, which is more like the ring buffer, and in the insert, update and delete + * methods we will put thing into the {@link #slots} and wait. And there is a background sync + * thread(see the {@link #syncLoop()} method) which get data from the {@link #slots} and write them + * to the FileSystem, and notify the caller that we have finished. + * <p/> + * TODO: try using disruptor to increase performance and simplify the logic? + * <p/> + * The {@link #storeTracker} keeps track of the modified procedures in the newest wal file, which is + * also the one being written currently. And the deleted bits in it are for all the procedures, not + * only the ones in the newest wal file. And when rolling a log, we will first store it in the + * trailer of the current wal file, and then reset its modified bits, so that it can start to track + * the modified procedures for the new wal file. + * <p/> + * The {@link #holdingCleanupTracker} is used to test whether we are safe to delete the oldest wal + * file. When there are log rolling and there are more than 1 wal files, we will make use of it. It + * will first be initialized to the oldest file's tracker(which is stored in the trailer), using the + * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, boolean)}, and then merge it + * with the tracker of every newer wal files, using the + * {@link ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}. If we find out + * that all the modified procedures for the oldest wal file are modified or deleted in newer wal + * files, then we can delete it. * @see ProcedureWALPrettyPrinter for printing content of a single WAL. * @see #main(String[]) to parse a directory of MasterWALProcs. */ @InterfaceAudience.Private -@InterfaceStability.Evolving public class WALProcedureStore extends ProcedureStoreBase { private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class); public static final String LOG_PREFIX = "pv2-"; @@ -166,7 +198,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private int syncWaitMsec; // Variables used for UI display - private CircularFifoQueue syncMetricsQueue; + private CircularFifoQueue<SyncMetrics> syncMetricsQueue; public static class SyncMetrics { private long timestamp; @@ -228,11 +260,9 @@ public class WALProcedureStore extends ProcedureStoreBase { // Create archive dir up front. Rename won't work w/o it up on HDFS. if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { if (this.fs.mkdirs(this.walArchiveDir)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir); - } + LOG.debug("Created Procedure Store WAL archive dir {}", this.walArchiveDir); } else { - LOG.warn("Failed create of " + this.walArchiveDir); + LOG.warn("Failed create of {}", this.walArchiveDir); } } } @@ -248,7 +278,7 @@ public class WALProcedureStore extends ProcedureStoreBase { runningProcCount = numSlots; syncMaxSlot = numSlots; slots = new ByteSlot[numSlots]; - slotsCache = new LinkedTransferQueue(); + slotsCache = new LinkedTransferQueue<>(); while (slotsCache.size() < numSlots) { slotsCache.offer(new ByteSlot()); } @@ -267,7 +297,7 @@ public class WALProcedureStore extends ProcedureStoreBase { useHsync = conf.getBoolean(USE_HSYNC_CONF_KEY, DEFAULT_USE_HSYNC); // WebUI - syncMetricsQueue = new CircularFifoQueue( + syncMetricsQueue = new CircularFifoQueue<>( conf.getInt(STORE_WAL_SYNC_STATS_COUNT, DEFAULT_SYNC_STATS_COUNT)); // Init sync thread @@ -394,9 +424,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // We have the lease on the log oldLogs = getLogFiles(); if (getMaxLogId(oldLogs) > flushLogId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); - } + LOG.debug("Someone else created new logs. Expected maxLogId < {}", flushLogId); logs.getLast().removeFile(this.walArchiveDir); continue; } @@ -410,7 +438,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void load(final ProcedureLoader loader) throws IOException { + public void load(ProcedureLoader loader) throws IOException { lock.lock(); try { if (logs.isEmpty()) { @@ -425,7 +453,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Load the old logs - final Iterator<ProcedureWALFile> it = logs.descendingIterator(); + Iterator<ProcedureWALFile> it = logs.descendingIterator(); it.next(); // Skip the current log ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { @@ -485,7 +513,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void insert(final Procedure proc, final Procedure[] subprocs) { + public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { if (LOG.isTraceEnabled()) { LOG.trace("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)); } @@ -519,7 +547,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void insert(final Procedure[] procs) { + public void insert(Procedure<?>[] procs) { if (LOG.isTraceEnabled()) { LOG.trace("Insert " + Arrays.toString(procs)); } @@ -548,7 +576,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void update(final Procedure proc) { + public void update(Procedure<?> proc) { if (LOG.isTraceEnabled()) { LOG.trace("Update " + proc); } @@ -571,11 +599,8 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void delete(final long procId) { - if (LOG.isTraceEnabled()) { - LOG.trace("Delete " + procId); - } - + public void delete(long procId) { + LOG.trace("Delete {}", procId); ByteSlot slot = acquireSlot(); try { // Serialize the delete @@ -594,7 +619,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void delete(final Procedure proc, final long[] subProcIds) { + public void delete(Procedure<?> proc, long[] subProcIds) { assert proc != null : "expected a non-null procedure"; assert subProcIds != null && subProcIds.length > 0 : "expected subProcIds"; if (LOG.isTraceEnabled()) { @@ -630,7 +655,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private void delete(final long[] procIds) { + private void delete(long[] procIds) { if (LOG.isTraceEnabled()) { LOG.trace("Delete " + Arrays.toString(procIds)); } @@ -736,20 +761,20 @@ public class WALProcedureStore extends ProcedureStoreBase { storeTracker.insert(subProcIds); } else { storeTracker.insert(procId, subProcIds); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); } break; case UPDATE: storeTracker.update(procId); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); break; case DELETE: if (subProcIds != null && subProcIds.length > 0) { storeTracker.delete(subProcIds); - holdingCleanupTracker.setDeletedIfSet(subProcIds); + holdingCleanupTracker.setDeletedIfModified(subProcIds); } else { storeTracker.delete(procId); - holdingCleanupTracker.setDeletedIfSet(procId); + holdingCleanupTracker.setDeletedIfModified(procId); } break; default: @@ -973,16 +998,12 @@ public class WALProcedureStore extends ProcedureStoreBase { private void periodicRoll() throws IOException { if (storeTracker.isEmpty()) { - if (LOG.isTraceEnabled()) { - LOG.trace("no active procedures"); - } + LOG.trace("no active procedures"); tryRollWriter(); removeAllLogs(flushLogId - 1); } else { - if (storeTracker.isUpdated()) { - if (LOG.isTraceEnabled()) { - LOG.trace("all the active procedures are in the latest log"); - } + if (storeTracker.isAllModified()) { + LOG.trace("all the active procedures are in the latest log"); removeAllLogs(flushLogId - 1); } @@ -997,18 +1018,20 @@ public class WALProcedureStore extends ProcedureStoreBase { } private boolean rollWriter() throws IOException { - if (!isRunning()) return false; + if (!isRunning()) { + return false; + } // Create new state-log if (!rollWriter(flushLogId + 1)) { - LOG.warn("someone else has already created log " + flushLogId); + LOG.warn("someone else has already created log {}", flushLogId); return false; } // We have the lease on the log, // but we should check if someone else has created new files if (getMaxLogId(getLogFiles()) > flushLogId) { - LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId); + LOG.warn("Someone else created new logs. Expected maxLogId < {}", flushLogId); logs.getLast().removeFile(this.walArchiveDir); return false; } @@ -1064,7 +1087,7 @@ public class WALProcedureStore extends ProcedureStoreBase { closeCurrentLogStream(); - storeTracker.resetUpdates(); + storeTracker.resetModified(); stream = newStream; flushLogId = logId; totalSynced.set(0); @@ -1092,12 +1115,12 @@ public class WALProcedureStore extends ProcedureStoreBase { try { ProcedureWALFile log = logs.getLast(); - log.setProcIds(storeTracker.getUpdatedMinProcId(), storeTracker.getUpdatedMaxProcId()); + log.setProcIds(storeTracker.getModifiedMinProcId(), storeTracker.getModifiedMaxProcId()); log.updateLocalTracker(storeTracker); long trailerSize = ProcedureWALFormat.writeTrailer(stream, storeTracker); log.addToSize(trailerSize); } catch (IOException e) { - LOG.warn("Unable to write the trailer: " + e.getMessage()); + LOG.warn("Unable to write the trailer", e); } try { stream.close(); @@ -1134,9 +1157,9 @@ public class WALProcedureStore extends ProcedureStoreBase { // - the other WALs are scanned to remove procs already in other wals. // TODO: exit early if holdingCleanupTracker.isEmpty() holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true); - holdingCleanupTracker.setDeletedIfSet(storeTracker); + holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker); for (int i = 1, size = logs.size() - 1; i < size; ++i) { - holdingCleanupTracker.setDeletedIfSet(logs.get(i).getTracker()); + holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker()); } } @@ -1144,12 +1167,12 @@ public class WALProcedureStore extends ProcedureStoreBase { * Remove all logs with logId <= {@code lastLogId}. */ private void removeAllLogs(long lastLogId) { - if (logs.size() <= 1) return; - - if (LOG.isTraceEnabled()) { - LOG.trace("Remove all state logs with ID less than " + lastLogId); + if (logs.size() <= 1) { + return; } + LOG.trace("Remove all state logs with ID less than {}", lastLogId); + boolean removed = false; while (logs.size() > 1) { ProcedureWALFile log = logs.getFirst(); @@ -1167,14 +1190,10 @@ public class WALProcedureStore extends ProcedureStoreBase { private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { try { - if (LOG.isTraceEnabled()) { - LOG.trace("Removing log=" + log); - } + LOG.trace("Removing log={}", log); log.removeFile(walArchiveDir); logs.remove(log); - if (LOG.isDebugEnabled()) { - LOG.info("Removed log=" + log + ", activeLogs=" + logs); - } + LOG.debug("Removed log={}, activeLogs={}", log, logs); assert logs.size() > 0 : "expected at least one log"; } catch (IOException e) { LOG.error("Unable to remove log: " + log, e); @@ -1238,50 +1257,53 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private static long getMaxLogId(final FileStatus[] logFiles) { - long maxLogId = 0; - if (logFiles != null && logFiles.length > 0) { - for (int i = 0; i < logFiles.length; ++i) { - maxLogId = Math.max(maxLogId, getLogIdFromName(logFiles[i].getPath().getName())); - } + /** + * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort + * the file set by log id. + * @return Max-LogID of the specified log file set + */ + private static long getMaxLogId(FileStatus[] logFiles) { + if (logFiles == null || logFiles.length == 0) { + return 0L; } - return maxLogId; + return getLogIdFromName(logFiles[logFiles.length - 1].getPath().getName()); } /** + * Make sure that the file set are gotten by calling {@link #getLogFiles()}, where we will sort + * the file set by log id. * @return Max-LogID of the specified log file set */ - private long initOldLogs(final FileStatus[] logFiles) throws IOException { - this.logs.clear(); - + private long initOldLogs(FileStatus[] logFiles) throws IOException { + if (logFiles == null || logFiles.length == 0) { + return 0L; + } long maxLogId = 0; - if (logFiles != null && logFiles.length > 0) { - for (int i = 0; i < logFiles.length; ++i) { - final Path logPath = logFiles[i].getPath(); - leaseRecovery.recoverFileLease(fs, logPath); - if (!isRunning()) { - throw new IOException("wal aborting"); - } + for (int i = 0; i < logFiles.length; ++i) { + final Path logPath = logFiles[i].getPath(); + leaseRecovery.recoverFileLease(fs, logPath); + if (!isRunning()) { + throw new IOException("wal aborting"); + } - maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); - ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); - if (log != null) { - this.logs.add(log); - } + maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); + ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); + if (log != null) { + this.logs.add(log); } - Collections.sort(this.logs); - initTrackerFromOldLogs(); } + initTrackerFromOldLogs(); return maxLogId; } /** - * If last log's tracker is not null, use it as {@link #storeTracker}. - * Otherwise, set storeTracker as partial, and let {@link ProcedureWALFormatReader} rebuild - * it using entries in the log. + * If last log's tracker is not null, use it as {@link #storeTracker}. Otherwise, set storeTracker + * as partial, and let {@link ProcedureWALFormatReader} rebuild it using entries in the log. */ private void initTrackerFromOldLogs() { - if (logs.isEmpty() || !isRunning()) return; + if (logs.isEmpty() || !isRunning()) { + return; + } ProcedureWALFile log = logs.getLast(); if (!log.getTracker().isPartial()) { storeTracker.resetTo(log.getTracker()); @@ -1295,20 +1317,18 @@ public class WALProcedureStore extends ProcedureStoreBase { * Loads given log file and it's tracker. */ private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) - throws IOException { + throws IOException { final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); if (logFile.getLen() == 0) { - LOG.warn("Remove uninitialized log: " + logFile); + LOG.warn("Remove uninitialized log: {}", logFile); log.removeFile(walArchiveDir); return null; } - if (LOG.isDebugEnabled()) { - LOG.debug("Opening Pv2 " + logFile); - } + LOG.debug("Opening Pv2 {}", logFile); try { log.open(); } catch (ProcedureWALFormat.InvalidWALDataException e) { - LOG.warn("Remove uninitialized log: " + logFile, e); + LOG.warn("Remove uninitialized log: {}", logFile, e); log.removeFile(walArchiveDir); return null; } catch (IOException e) { @@ -1322,7 +1342,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } catch (IOException e) { log.getTracker().reset(); log.getTracker().setPartialFlag(true); - LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage()); + LOG.warn("Unable to read tracker for {}", log, e); } log.close(); @@ -1350,7 +1370,7 @@ public class WALProcedureStore extends ProcedureStoreBase { }); try { store.start(16); - ProcedureExecutor pe = new ProcedureExecutor(conf, new Object()/*Pass anything*/, store); + ProcedureExecutor<?> pe = new ProcedureExecutor<>(conf, new Object()/*Pass anything*/, store); pe.init(1, true); } finally { store.stop(true); http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java index ffc6ab8..d6b58d0 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/TestProcedureStoreTracker.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue; import java.util.Random; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.BitSetNode; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.ClassRule; @@ -119,29 +118,29 @@ public class TestProcedureStoreTracker { tracker.insert(procs[0]); tracker.insert(procs[1], new long[] { procs[2], procs[3], procs[4] }); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllModified()); - tracker.resetUpdates(); - assertFalse(tracker.isUpdated()); + tracker.resetModified(); + assertFalse(tracker.isAllModified()); for (int i = 0; i < 4; ++i) { tracker.update(procs[i]); assertFalse(tracker.isEmpty()); - assertFalse(tracker.isUpdated()); + assertFalse(tracker.isAllModified()); } tracker.update(procs[4]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllModified()); tracker.update(procs[5]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllModified()); for (int i = 0; i < 5; ++i) { tracker.delete(procs[i]); assertFalse(tracker.isEmpty()); - assertTrue(tracker.isUpdated()); + assertTrue(tracker.isAllModified()); } tracker.delete(procs[5]); assertTrue(tracker.isEmpty()); @@ -235,7 +234,7 @@ public class TestProcedureStoreTracker { for (long i : active) { tracker.insert(i); } - tracker.resetUpdates(); + tracker.resetModified(); for (long i : updated) { tracker.update(i); } @@ -252,11 +251,11 @@ public class TestProcedureStoreTracker { BitSetNode buildBitSetNode(long[] active, long[] updated, long[] deleted) { BitSetNode bitSetNode = new BitSetNode(0L, false); for (long i : active) { - bitSetNode.update(i); + bitSetNode.insertOrUpdate(i); } - bitSetNode.resetUpdates(); + bitSetNode.resetModified(); for (long i : updated) { - bitSetNode.update(i); + bitSetNode.insertOrUpdate(i); } for (long i : deleted) { bitSetNode.delete(i); @@ -276,9 +275,9 @@ public class TestProcedureStoreTracker { assertEquals(false, tracker.isEmpty()); for (int i = 0; i < procIds.length; ++i) { - tracker.setDeletedIfSet(procIds[i] - 1); - tracker.setDeletedIfSet(procIds[i]); - tracker.setDeletedIfSet(procIds[i] + 1); + tracker.setDeletedIfModified(procIds[i] - 1); + tracker.setDeletedIfModified(procIds[i]); + tracker.setDeletedIfModified(procIds[i] + 1); } assertEquals(true, tracker.isEmpty()); @@ -289,7 +288,7 @@ public class TestProcedureStoreTracker { } assertEquals(false, tracker.isEmpty()); - tracker.setDeletedIfSet(procIds); + tracker.setDeletedIfModified(procIds); assertEquals(true, tracker.isEmpty()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index b1bd254..d682481 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -423,11 +423,11 @@ public class TestWALProcedureStore { final Procedure[] procs, final int[] updatedProcs, final int[] nonUpdatedProcs) { for (int index : updatedProcs) { long procId = procs[index].getProcId(); - assertTrue("Procedure id : " + procId, tracker.isUpdated(procId)); + assertTrue("Procedure id : " + procId, tracker.isModified(procId)); } for (int index : nonUpdatedProcs) { long procId = procs[index].getProcId(); - assertFalse("Procedure id : " + procId, tracker.isUpdated(procId)); + assertFalse("Procedure id : " + procId, tracker.isModified(procId)); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 0de34a9..f9e3211 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; * avoiding port contention if another local HBase instance is already running). * <p>To preserve test data directories, pass the system property "hbase.testing.preserve.testdir" * setting it to true. + * For triggering test. */ @InterfaceAudience.Public @SuppressWarnings("deprecation")