HBASE-21250 Refactor WALProcedureStore and add more comments for better understanding the implementation
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5a300f3f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5a300f3f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5a300f3f Branch: refs/heads/branch-2.1 Commit: 5a300f3fc93f1e05731954aa829ec3e837338cef Parents: 9d34b45 Author: zhangduo <zhang...@apache.org> Authored: Sat Oct 6 17:27:05 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Sun Oct 7 17:16:09 2018 +0800 ---------------------------------------------------------------------- .../hbase/procedure2/store/BitSetNode.java | 397 +++++++++++ .../procedure2/store/NoopProcedureStore.java | 9 +- .../hbase/procedure2/store/ProcedureStore.java | 9 +- .../procedure2/store/ProcedureStoreTracker.java | 502 +++---------- .../CorruptedWALProcedureStoreException.java | 6 +- .../procedure2/store/wal/ProcedureWALFile.java | 7 +- .../store/wal/ProcedureWALFormat.java | 38 +- .../store/wal/ProcedureWALFormatReader.java | 701 ++----------------- .../store/wal/ProcedureWALPrettyPrinter.java | 14 +- .../procedure2/store/wal/WALProcedureMap.java | 607 ++++++++++++++++ .../procedure2/store/wal/WALProcedureStore.java | 206 +++--- .../store/TestProcedureStoreTracker.java | 31 +- .../store/wal/TestWALProcedureStore.java | 4 +- .../hadoop/hbase/HBaseTestingUtility.java | 1 + 14 files changed, 1317 insertions(+), 1215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java new file mode 100644 index 0000000..b76c026 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2.store; + +import java.util.Arrays; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met). + * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the range + * of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K is + * BITS_PER_WORD. + * <p/> + * We have two main bit sets to describe the state of procedures, the meanings are: + * + * <pre> + * ---------------------- + * | modified | deleted | meaning + * | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates(). + * | 1 | 0 | proc was updated (but not deleted). + * | 1 | 1 | proc was deleted. + * | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past). + * ---------------------- + * </pre> + * + * The meaning of modified is that, we have modified the state of the procedure, no matter insert, + * update, or delete. And if it is an insert or update, we will set the deleted to 0, if not we will + * set the delete to 1. + * <p/> + * For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the + * partial one, the initial modified value is 0 and the initial deleted value is also 0. In + * {@link #unsetPartialFlag()} we will reset the deleted to 1 if it is not modified. + */ +@InterfaceAudience.Private +class BitSetNode { + private static final long WORD_MASK = 0xffffffffffffffffL; + private static final int ADDRESS_BITS_PER_WORD = 6; + private static final int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; + private static final int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; + + /** + * Mimics {@link ProcedureStoreTracker#partial}. It will effect how we fill the new deleted bits + * when growing. + */ + private boolean partial; + + /** + * Set of procedures which have been modified since last {@link #resetModified()}. Useful to track + * procedures which have been modified since last WAL write. + */ + private long[] modified; + + /** + * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. This + * represents global state since it's not reset on WAL rolls. + */ + private long[] deleted; + /** + * Offset of bitmap i.e. procedure id corresponding to first bit. + */ + private long start; + + public void dump() { + System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), getActiveMinProcId(), + getActiveMaxProcId()); + System.out.println("Modified:"); + for (int i = 0; i < modified.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + System.out.print((modified[i] & (1L << j)) != 0 ? "1" : "0"); + } + System.out.println(" " + i); + } + System.out.println(); + System.out.println("Delete:"); + for (int i = 0; i < deleted.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0"); + } + System.out.println(" " + i); + } + System.out.println(); + } + + public BitSetNode(long procId, boolean partial) { + start = alignDown(procId); + + int count = 1; + modified = new long[count]; + deleted = new long[count]; + if (!partial) { + Arrays.fill(deleted, WORD_MASK); + } + + this.partial = partial; + updateState(procId, false); + } + + public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) { + start = data.getStartId(); + int size = data.getUpdatedCount(); + assert size == data.getDeletedCount(); + modified = new long[size]; + deleted = new long[size]; + for (int i = 0; i < size; ++i) { + modified[i] = data.getUpdated(i); + deleted[i] = data.getDeleted(i); + } + partial = false; + } + + public BitSetNode(BitSetNode other, boolean resetDelete) { + this.start = other.start; + this.partial = other.partial; + this.modified = other.modified.clone(); + // The resetDelete will be set to true when building cleanup tracker. + // The intention here is that, if a procedure is not modified in this tracker, then we do not + // need to take care of it, so we will set deleted to true for these bits, i.e, if modified is + // 0, then we set deleted to 1, otherwise keep it as is. So here, the equation is + // deleted |= ~modified, i.e, + if (resetDelete) { + this.deleted = new long[other.deleted.length]; + for (int i = 0; i < this.deleted.length; ++i) { + this.deleted[i] |= ~(other.modified[i]); + } + } else { + this.deleted = other.deleted.clone(); + } + } + + public void insertOrUpdate(final long procId) { + updateState(procId, false); + } + + public void delete(final long procId) { + updateState(procId, true); + } + + public long getStart() { + return start; + } + + public long getEnd() { + return start + (modified.length << ADDRESS_BITS_PER_WORD) - 1; + } + + public boolean contains(final long procId) { + return start <= procId && procId <= getEnd(); + } + + public DeleteState isDeleted(final long procId) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + if (wordIndex >= deleted.length) { + return DeleteState.MAYBE; + } + return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO; + } + + public boolean isModified(long procId) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + if (wordIndex >= modified.length) { + return false; + } + return (modified[wordIndex] & (1L << bitmapIndex)) != 0; + } + + /** + * @return true, if all the procedures has been modified. + */ + public boolean isAllModified() { + // TODO: cache the value + for (int i = 0; i < modified.length; ++i) { + if ((modified[i] | deleted[i]) != WORD_MASK) { + return false; + } + } + return true; + } + + /** + * @return true, if there are no active procedures in this BitSetNode, else false. + */ + public boolean isEmpty() { + // TODO: cache the value + for (int i = 0; i < deleted.length; ++i) { + if (deleted[i] != WORD_MASK) { + return false; + } + } + return true; + } + + public void resetModified() { + Arrays.fill(modified, 0); + } + + public void unsetPartialFlag() { + partial = false; + for (int i = 0; i < modified.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + if ((modified[i] & (1L << j)) == 0) { + deleted[i] |= (1L << j); + } + } + } + } + + /** + * Convert to + * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode + * protobuf. + */ + public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { + ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = + ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); + builder.setStartId(start); + for (int i = 0; i < modified.length; ++i) { + builder.addUpdated(modified[i]); + builder.addDeleted(deleted[i]); + } + return builder.build(); + } + + // ======================================================================== + // Grow/Merge Helpers + // ======================================================================== + public boolean canGrow(final long procId) { + return Math.abs(procId - start) < MAX_NODE_SIZE; + } + + public boolean canMerge(final BitSetNode rightNode) { + // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD. + assert start < rightNode.start; + return (rightNode.getEnd() - start) < MAX_NODE_SIZE; + } + + public void grow(final long procId) { + int delta, offset; + + if (procId < start) { + // add to head + long newStart = alignDown(procId); + delta = (int) (start - newStart) >> ADDRESS_BITS_PER_WORD; + offset = delta; + start = newStart; + } else { + // Add to tail + long newEnd = alignUp(procId + 1); + delta = (int) (newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD; + offset = 0; + } + + long[] newBitmap; + int oldSize = modified.length; + + newBitmap = new long[oldSize + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = 0; + } + System.arraycopy(modified, 0, newBitmap, offset, oldSize); + modified = newBitmap; + + newBitmap = new long[deleted.length + delta]; + for (int i = 0; i < newBitmap.length; ++i) { + newBitmap[i] = partial ? 0 : WORD_MASK; + } + System.arraycopy(deleted, 0, newBitmap, offset, oldSize); + deleted = newBitmap; + } + + public void merge(final BitSetNode rightNode) { + int delta = (int) (rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD; + + long[] newBitmap; + int oldSize = modified.length; + int newSize = (delta - rightNode.modified.length); + int offset = oldSize + newSize; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(modified, 0, newBitmap, 0, oldSize); + System.arraycopy(rightNode.modified, 0, newBitmap, offset, rightNode.modified.length); + modified = newBitmap; + + newBitmap = new long[oldSize + delta]; + System.arraycopy(deleted, 0, newBitmap, 0, oldSize); + System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length); + deleted = newBitmap; + + for (int i = 0; i < newSize; ++i) { + modified[offset + i] = 0; + deleted[offset + i] = partial ? 0 : WORD_MASK; + } + } + + @Override + public String toString() { + return "BitSetNode(" + getStart() + "-" + getEnd() + ")"; + } + + // ======================================================================== + // Min/Max Helpers + // ======================================================================== + public long getActiveMinProcId() { + long minProcId = start; + for (int i = 0; i < deleted.length; ++i) { + if (deleted[i] == 0) { + return (minProcId); + } + + if (deleted[i] != WORD_MASK) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + if ((deleted[i] & (1L << j)) != 0) { + return minProcId + j; + } + } + } + + minProcId += BITS_PER_WORD; + } + return minProcId; + } + + public long getActiveMaxProcId() { + long maxProcId = getEnd(); + for (int i = deleted.length - 1; i >= 0; --i) { + if (deleted[i] == 0) { + return maxProcId; + } + + if (deleted[i] != WORD_MASK) { + for (int j = BITS_PER_WORD - 1; j >= 0; --j) { + if ((deleted[i] & (1L << j)) == 0) { + return maxProcId - (BITS_PER_WORD - 1 - j); + } + } + } + maxProcId -= BITS_PER_WORD; + } + return maxProcId; + } + + // ======================================================================== + // Bitmap Helpers + // ======================================================================== + private int getBitmapIndex(final long procId) { + return (int) (procId - start); + } + + void updateState(long procId, boolean isDeleted) { + int bitmapIndex = getBitmapIndex(procId); + int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; + long value = (1L << bitmapIndex); + + modified[wordIndex] |= value; + if (isDeleted) { + deleted[wordIndex] |= value; + } else { + deleted[wordIndex] &= ~value; + } + } + + // ======================================================================== + // Helpers + // ======================================================================== + /** + * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. + */ + private static long alignUp(final long x) { + return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; + } + + /** + * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. + */ + private static long alignDown(final long x) { + return x & -BITS_PER_WORD; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index 9c6176d..8fbc147 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store; import java.io.IOException; @@ -64,17 +63,17 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public void insert(Procedure proc, Procedure[] subprocs) { + public void insert(Procedure<?> proc, Procedure<?>[] subprocs) { // no-op } @Override - public void insert(Procedure[] proc) { + public void insert(Procedure<?>[] proc) { // no-op } @Override - public void update(Procedure proc) { + public void update(Procedure<?> proc) { // no-op } @@ -84,7 +83,7 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public void delete(Procedure proc, long[] subprocs) { + public void delete(Procedure<?> proc, long[] subprocs) { // no-op } http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 7288340..8063b12 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -81,6 +81,7 @@ public interface ProcedureStore { * @throws IOException if there was an error fetching/deserializing the procedure * @return the next procedure in the iteration. */ + @SuppressWarnings("rawtypes") Procedure next() throws IOException; } @@ -173,7 +174,7 @@ public interface ProcedureStore { * @param proc the procedure to serialize and write to the store. * @param subprocs the newly created child of the proc. */ - void insert(Procedure proc, Procedure[] subprocs); + void insert(Procedure<?> proc, Procedure<?>[] subprocs); /** * Serialize a set of new procedures. @@ -182,14 +183,14 @@ public interface ProcedureStore { * * @param procs the procedures to serialize and write to the store. */ - void insert(Procedure[] procs); + void insert(Procedure<?>[] procs); /** * The specified procedure was executed, * and the new state should be written to the store. * @param proc the procedure to serialize and write to the store. */ - void update(Procedure proc); + void update(Procedure<?> proc); /** * The specified procId was removed from the executor, @@ -205,7 +206,7 @@ public interface ProcedureStore { * @param parentProc the parent procedure to serialize and write to the store. * @param subProcIds the IDs of the sub-procedure to remove. */ - void delete(Procedure parentProc, long[] subProcIds); + void delete(Procedure<?> parentProc, long[] subProcIds); /** * The specified procIds were removed from the executor, http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 2dad5ac..361419a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -53,383 +53,14 @@ public class ProcedureStoreTracker { * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to * understand it's real use. */ - private boolean partial = false; + boolean partial = false; - private long minUpdatedProcId = Long.MAX_VALUE; - private long maxUpdatedProcId = Long.MIN_VALUE; + private long minModifiedProcId = Long.MAX_VALUE; + private long maxModifiedProcId = Long.MIN_VALUE; public enum DeleteState { YES, NO, MAYBE } - /** - * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met). - * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the - * range of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K - * is BITS_PER_WORD. - */ - public static class BitSetNode { - private final static long WORD_MASK = 0xffffffffffffffffL; - private final static int ADDRESS_BITS_PER_WORD = 6; - private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD; - private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD; - - /** - * Mimics {@link ProcedureStoreTracker#partial}. - */ - private final boolean partial; - - /* ---------------------- - * | updated | deleted | meaning - * | 0 | 0 | proc exists, but hasn't been updated since last resetUpdates(). - * | 1 | 0 | proc was updated (but not deleted). - * | 1 | 1 | proc was deleted. - * | 0 | 1 | proc doesn't exist (maybe never created, maybe deleted in past). - /* ---------------------- - */ - - /** - * Set of procedures which have been updated since last {@link #resetUpdates()}. - * Useful to track procedures which have been updated since last WAL write. - */ - private long[] updated; - /** - * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. - * This represents global state since it's not reset on WAL rolls. - */ - private long[] deleted; - /** - * Offset of bitmap i.e. procedure id corresponding to first bit. - */ - private long start; - - public void dump() { - System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), - getActiveMinProcId(), getActiveMaxProcId()); - System.out.println("Update:"); - for (int i = 0; i < updated.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0"); - } - System.out.println(" " + i); - } - System.out.println(); - System.out.println("Delete:"); - for (int i = 0; i < deleted.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0"); - } - System.out.println(" " + i); - } - System.out.println(); - } - - public BitSetNode(final long procId, final boolean partial) { - start = alignDown(procId); - - int count = 1; - updated = new long[count]; - deleted = new long[count]; - for (int i = 0; i < count; ++i) { - updated[i] = 0; - deleted[i] = partial ? 0 : WORD_MASK; - } - - this.partial = partial; - updateState(procId, false); - } - - protected BitSetNode(final long start, final long[] updated, final long[] deleted) { - this.start = start; - this.updated = updated; - this.deleted = deleted; - this.partial = false; - } - - public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) { - start = data.getStartId(); - int size = data.getUpdatedCount(); - updated = new long[size]; - deleted = new long[size]; - for (int i = 0; i < size; ++i) { - updated[i] = data.getUpdated(i); - deleted[i] = data.getDeleted(i); - } - partial = false; - } - - public BitSetNode(final BitSetNode other, final boolean resetDelete) { - this.start = other.start; - this.partial = other.partial; - this.updated = other.updated.clone(); - if (resetDelete) { - this.deleted = new long[other.deleted.length]; - for (int i = 0; i < this.deleted.length; ++i) { - this.deleted[i] = ~(other.updated[i]); - } - } else { - this.deleted = other.deleted.clone(); - } - } - - public void update(final long procId) { - updateState(procId, false); - } - - public void delete(final long procId) { - updateState(procId, true); - } - - public long getStart() { - return start; - } - - public long getEnd() { - return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1; - } - - public boolean contains(final long procId) { - return start <= procId && procId <= getEnd(); - } - - public DeleteState isDeleted(final long procId) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - if (wordIndex >= deleted.length) { - return DeleteState.MAYBE; - } - return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO; - } - - private boolean isUpdated(final long procId) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - if (wordIndex >= updated.length) { - return false; - } - return (updated[wordIndex] & (1L << bitmapIndex)) != 0; - } - - public boolean isUpdated() { - // TODO: cache the value - for (int i = 0; i < updated.length; ++i) { - if ((updated[i] | deleted[i]) != WORD_MASK) { - return false; - } - } - return true; - } - - /** - * @return true, if there are no active procedures in this BitSetNode, else false. - */ - public boolean isEmpty() { - // TODO: cache the value - for (int i = 0; i < deleted.length; ++i) { - if (deleted[i] != WORD_MASK) { - return false; - } - } - return true; - } - - public void resetUpdates() { - for (int i = 0; i < updated.length; ++i) { - updated[i] = 0; - } - } - - /** - * Clears the {@link #deleted} bitmaps. - */ - public void undeleteAll() { - for (int i = 0; i < updated.length; ++i) { - deleted[i] = 0; - } - } - - public void unsetPartialFlag() { - for (int i = 0; i < updated.length; ++i) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - if ((updated[i] & (1L << j)) == 0) { - deleted[i] |= (1L << j); - } - } - } - } - - /** - * Convert to - * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode - * protobuf. - */ - public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() { - ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder = - ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder(); - builder.setStartId(start); - for (int i = 0; i < updated.length; ++i) { - builder.addUpdated(updated[i]); - builder.addDeleted(deleted[i]); - } - return builder.build(); - } - - // ======================================================================== - // Grow/Merge Helpers - // ======================================================================== - public boolean canGrow(final long procId) { - return Math.abs(procId - start) < MAX_NODE_SIZE; - } - - public boolean canMerge(final BitSetNode rightNode) { - // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD. - assert start < rightNode.start; - return (rightNode.getEnd() - start) < MAX_NODE_SIZE; - } - - public void grow(final long procId) { - int delta, offset; - - if (procId < start) { - // add to head - long newStart = alignDown(procId); - delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD; - offset = delta; - start = newStart; - } else { - // Add to tail - long newEnd = alignUp(procId + 1); - delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD; - offset = 0; - } - - long[] newBitmap; - int oldSize = updated.length; - - newBitmap = new long[oldSize + delta]; - for (int i = 0; i < newBitmap.length; ++i) { - newBitmap[i] = 0; - } - System.arraycopy(updated, 0, newBitmap, offset, oldSize); - updated = newBitmap; - - newBitmap = new long[deleted.length + delta]; - for (int i = 0; i < newBitmap.length; ++i) { - newBitmap[i] = partial ? 0 : WORD_MASK; - } - System.arraycopy(deleted, 0, newBitmap, offset, oldSize); - deleted = newBitmap; - } - - public void merge(final BitSetNode rightNode) { - int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD; - - long[] newBitmap; - int oldSize = updated.length; - int newSize = (delta - rightNode.updated.length); - int offset = oldSize + newSize; - - newBitmap = new long[oldSize + delta]; - System.arraycopy(updated, 0, newBitmap, 0, oldSize); - System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length); - updated = newBitmap; - - newBitmap = new long[oldSize + delta]; - System.arraycopy(deleted, 0, newBitmap, 0, oldSize); - System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length); - deleted = newBitmap; - - for (int i = 0; i < newSize; ++i) { - updated[offset + i] = 0; - deleted[offset + i] = partial ? 0 : WORD_MASK; - } - } - - @Override - public String toString() { - return "BitSetNode(" + getStart() + "-" + getEnd() + ")"; - } - - // ======================================================================== - // Min/Max Helpers - // ======================================================================== - public long getActiveMinProcId() { - long minProcId = start; - for (int i = 0; i < deleted.length; ++i) { - if (deleted[i] == 0) { - return(minProcId); - } - - if (deleted[i] != WORD_MASK) { - for (int j = 0; j < BITS_PER_WORD; ++j) { - if ((deleted[i] & (1L << j)) != 0) { - return minProcId + j; - } - } - } - - minProcId += BITS_PER_WORD; - } - return minProcId; - } - - public long getActiveMaxProcId() { - long maxProcId = getEnd(); - for (int i = deleted.length - 1; i >= 0; --i) { - if (deleted[i] == 0) { - return maxProcId; - } - - if (deleted[i] != WORD_MASK) { - for (int j = BITS_PER_WORD - 1; j >= 0; --j) { - if ((deleted[i] & (1L << j)) == 0) { - return maxProcId - (BITS_PER_WORD - 1 - j); - } - } - } - maxProcId -= BITS_PER_WORD; - } - return maxProcId; - } - - // ======================================================================== - // Bitmap Helpers - // ======================================================================== - private int getBitmapIndex(final long procId) { - return (int)(procId - start); - } - - private void updateState(final long procId, final boolean isDeleted) { - int bitmapIndex = getBitmapIndex(procId); - int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD; - long value = (1L << bitmapIndex); - - updated[wordIndex] |= value; - if (isDeleted) { - deleted[wordIndex] |= value; - } else { - deleted[wordIndex] &= ~value; - } - } - - - // ======================================================================== - // Helpers - // ======================================================================== - /** - * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. - */ - private static long alignUp(final long x) { - return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD; - } - - /** - * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to. - */ - private static long alignDown(final long x) { - return x & -BITS_PER_WORD; - } - } - - public void resetToProto(final ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { + public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { reset(); for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) { final BitSetNode node = new BitSetNode(protoNode); @@ -440,14 +71,23 @@ public class ProcedureStoreTracker { /** * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. */ - public void resetTo(final ProcedureStoreTracker tracker) { + public void resetTo(ProcedureStoreTracker tracker) { resetTo(tracker, false); } - public void resetTo(final ProcedureStoreTracker tracker, final boolean resetDelete) { + /** + * Resets internal state to same as given {@code tracker}, and change the deleted flag according + * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap. + * <p/> + * The {@code resetDelete} will be set to true when building cleanup tracker, please see the + * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the + * deleted flag if {@code resetDelete} is true. + */ + public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) { + reset(); this.partial = tracker.partial; - this.minUpdatedProcId = tracker.minUpdatedProcId; - this.maxUpdatedProcId = tracker.maxUpdatedProcId; + this.minModifiedProcId = tracker.minModifiedProcId; + this.maxModifiedProcId = tracker.maxModifiedProcId; this.keepDeletes = tracker.keepDeletes; for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) { map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); @@ -458,25 +98,24 @@ public class ProcedureStoreTracker { insert(null, procId); } - public void insert(final long[] procIds) { + public void insert(long[] procIds) { for (int i = 0; i < procIds.length; ++i) { insert(procIds[i]); } } - public void insert(final long procId, final long[] subProcIds) { - BitSetNode node = null; - node = update(node, procId); + public void insert(long procId, long[] subProcIds) { + BitSetNode node = update(null, procId); for (int i = 0; i < subProcIds.length; ++i) { node = insert(node, subProcIds[i]); } } - private BitSetNode insert(BitSetNode node, final long procId) { + private BitSetNode insert(BitSetNode node, long procId) { if (node == null || !node.contains(procId)) { node = getOrCreateNode(procId); } - node.update(procId); + node.insertOrUpdate(procId); trackProcIds(procId); return node; } @@ -485,11 +124,11 @@ public class ProcedureStoreTracker { update(null, procId); } - private BitSetNode update(BitSetNode node, final long procId) { + private BitSetNode update(BitSetNode node, long procId) { node = lookupClosestNode(node, procId); assert node != null : "expected node to update procId=" + procId; assert node.contains(procId) : "expected procId=" + procId + " in the node"; - node.update(procId); + node.insertOrUpdate(procId); trackProcIds(procId); return node; } @@ -506,7 +145,7 @@ public class ProcedureStoreTracker { } } - private BitSetNode delete(BitSetNode node, final long procId) { + private BitSetNode delete(BitSetNode node, long procId) { node = lookupClosestNode(node, procId); assert node != null : "expected node to delete procId=" + procId; assert node.contains(procId) : "expected procId=" + procId + " in the node"; @@ -520,35 +159,62 @@ public class ProcedureStoreTracker { return node; } - @InterfaceAudience.Private - public void setDeleted(final long procId, final boolean isDeleted) { + /** + * Will be called when restarting where we need to rebuild the ProcedureStoreTracker. + */ + public void setMinMaxModifiedProcIds(long min, long max) { + this.minModifiedProcId = min; + this.maxModifiedProcId = max; + } + /** + * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The + * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart + * this is not true, as we will read the wal files in reverse order so a delete may come first. + */ + public void setDeleted(long procId, boolean isDeleted) { BitSetNode node = getOrCreateNode(procId); assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; node.updateState(procId, isDeleted); trackProcIds(procId); } - public void setDeletedIfSet(final long... procId) { + /** + * Set the given bit for the procId to delete if it was modified before. + * <p/> + * This method is used to test whether a procedure wal file can be safely deleted, as if all the + * procedures in the given procedure wal file has been modified in the new procedure wal files, + * then we can delete it. + */ + public void setDeletedIfModified(long... procId) { BitSetNode node = null; for (int i = 0; i < procId.length; ++i) { node = lookupClosestNode(node, procId[i]); - if (node != null && node.isUpdated(procId[i])) { + if (node != null && node.isModified(procId[i])) { node.delete(procId[i]); } } } - public void setDeletedIfSet(final ProcedureStoreTracker tracker) { + /** + * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by + * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, + * then we mark it as deleted. + * @see #setDeletedIfModified(long...) + */ + public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { BitSetNode trackerNode = null; - for (BitSetNode node: map.values()) { + for (BitSetNode node : map.values()) { final long minProcId = node.getStart(); final long maxProcId = node.getEnd(); for (long procId = minProcId; procId <= maxProcId; ++procId) { - if (!node.isUpdated(procId)) continue; + if (!node.isModified(procId)) { + continue; + } trackerNode = tracker.lookupClosestNode(trackerNode, procId); - if (trackerNode == null || !trackerNode.contains(procId) || trackerNode.isUpdated(procId)) { - // the procedure was removed or updated + if (trackerNode == null || !trackerNode.contains(procId) || + trackerNode.isModified(procId)) { + // the procedure was removed or modified node.delete(procId); } } @@ -568,28 +234,29 @@ public class ProcedureStoreTracker { } private void trackProcIds(long procId) { - minUpdatedProcId = Math.min(minUpdatedProcId, procId); - maxUpdatedProcId = Math.max(maxUpdatedProcId, procId); + minModifiedProcId = Math.min(minModifiedProcId, procId); + maxModifiedProcId = Math.max(maxModifiedProcId, procId); } - public long getUpdatedMinProcId() { - return minUpdatedProcId; + public long getModifiedMinProcId() { + return minModifiedProcId; } - public long getUpdatedMaxProcId() { - return maxUpdatedProcId; + public long getModifiedMaxProcId() { + return maxModifiedProcId; } public void reset() { this.keepDeletes = false; this.partial = false; this.map.clear(); - resetUpdates(); + resetModified(); } - public boolean isUpdated(long procId) { + public boolean isModified(long procId) { final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); - return entry != null && entry.getValue().contains(procId) && entry.getValue().isUpdated(procId); + return entry != null && entry.getValue().contains(procId) && + entry.getValue().isModified(procId); } /** @@ -604,7 +271,7 @@ public class ProcedureStoreTracker { if (entry != null && entry.getValue().contains(procId)) { BitSetNode node = entry.getValue(); DeleteState state = node.isDeleted(procId); - return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state; + return partial && !node.isModified(procId) ? DeleteState.MAYBE : state; } return partial ? DeleteState.MAYBE : DeleteState.YES; } @@ -656,11 +323,12 @@ public class ProcedureStoreTracker { } /** - * @return true if any procedure was updated since last call to {@link #resetUpdates()}. + * @return true if all procedure was modified or deleted since last call to + * {@link #resetModified()}. */ - public boolean isUpdated() { + public boolean isAllModified() { for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { - if (!entry.getValue().isUpdated()) { + if (!entry.getValue().isAllModified()) { return false; } } @@ -671,21 +339,15 @@ public class ProcedureStoreTracker { * Clears the list of updated procedure ids. This doesn't affect global list of active * procedure ids. */ - public void resetUpdates() { - for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { - entry.getValue().resetUpdates(); - } - minUpdatedProcId = Long.MAX_VALUE; - maxUpdatedProcId = Long.MIN_VALUE; - } - - public void undeleteAll() { + public void resetModified() { for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { - entry.getValue().undeleteAll(); + entry.getValue().resetModified(); } + minModifiedProcId = Long.MAX_VALUE; + maxModifiedProcId = Long.MIN_VALUE; } - private BitSetNode getOrCreateNode(final long procId) { + private BitSetNode getOrCreateNode(long procId) { // If procId can fit in left node (directly or by growing it) BitSetNode leftNode = null; boolean leftCanGrow = false; @@ -760,7 +422,7 @@ public class ProcedureStoreTracker { public void dump() { System.out.println("map " + map.size()); - System.out.println("isUpdated " + isUpdated()); + System.out.println("isAllModified " + isAllModified()); System.out.println("isEmpty " + isEmpty()); for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { entry.getValue().dump(); http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java index dd34896..ba4480f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/CorruptedWALProcedureStoreException.java @@ -15,19 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; /** * Thrown when a procedure WAL is corrupted */ @InterfaceAudience.Private -@InterfaceStability.Stable public class CorruptedWALProcedureStoreException extends HBaseIOException { + + private static final long serialVersionUID = -3407300445435898074L; + /** default constructor */ public CorruptedWALProcedureStoreException() { super(); http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java index 6226350..1676744 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -15,20 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; import java.io.IOException; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; @@ -37,7 +35,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * Describes a WAL File */ @InterfaceAudience.Private -@InterfaceStability.Evolving public class ProcedureWALFile implements Comparable<ProcedureWALFile> { private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class); http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index ac3a529..c9986ed 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -18,25 +18,22 @@ package org.apache.hadoop.hbase.procedure2.store.wal; -import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Iterator; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.io.util.StreamUtils; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader; import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.hadoop.hbase.procedure2.util.ByteSlot; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALTrailer; @@ -45,9 +42,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * Helper class that contains the WAL serialization utils. */ @InterfaceAudience.Private -@InterfaceStability.Evolving public final class ProcedureWALFormat { - private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormat.class); static final byte LOG_TYPE_STREAM = 0; static final byte LOG_TYPE_COMPACTED = 1; @@ -60,6 +55,9 @@ public final class ProcedureWALFormat { @InterfaceAudience.Private public static class InvalidWALDataException extends IOException { + + private static final long serialVersionUID = 5471733223070202196L; + public InvalidWALDataException(String s) { super(s); } @@ -75,9 +73,9 @@ public final class ProcedureWALFormat { private ProcedureWALFormat() {} - public static void load(final Iterator<ProcedureWALFile> logs, - final ProcedureStoreTracker tracker, final Loader loader) throws IOException { - final ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); + public static void load(Iterator<ProcedureWALFile> logs, ProcedureStoreTracker tracker, + Loader loader) throws IOException { + ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, loader); tracker.setKeepDeletes(true); try { // Ignore the last log which is current active log. @@ -93,8 +91,10 @@ public final class ProcedureWALFormat { reader.finish(); // The tracker is now updated with all the procedures read from the logs - tracker.setPartialFlag(false); - tracker.resetUpdates(); + if (tracker.isPartial()) { + tracker.setPartialFlag(false); + } + tracker.resetModified(); } finally { tracker.setKeepDeletes(false); } @@ -205,7 +205,7 @@ public final class ProcedureWALFormat { } public static void writeEntry(ByteSlot slot, ProcedureWALEntry.Type type, - Procedure proc, Procedure[] subprocs) throws IOException { + Procedure<?> proc, Procedure<?>[] subprocs) throws IOException { final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); builder.setType(type); builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); @@ -217,17 +217,17 @@ public final class ProcedureWALFormat { builder.build().writeDelimitedTo(slot); } - public static void writeInsert(ByteSlot slot, Procedure proc) + public static void writeInsert(ByteSlot slot, Procedure<?> proc) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INIT, proc, null); } - public static void writeInsert(ByteSlot slot, Procedure proc, Procedure[] subprocs) + public static void writeInsert(ByteSlot slot, Procedure<?> proc, Procedure<?>[] subprocs) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_INSERT, proc, subprocs); } - public static void writeUpdate(ByteSlot slot, Procedure proc) + public static void writeUpdate(ByteSlot slot, Procedure<?> proc) throws IOException { writeEntry(slot, ProcedureWALEntry.Type.PROCEDURE_WAL_UPDATE, proc, null); } @@ -240,7 +240,7 @@ public final class ProcedureWALFormat { builder.build().writeDelimitedTo(slot); } - public static void writeDelete(ByteSlot slot, Procedure proc, long[] subprocs) + public static void writeDelete(ByteSlot slot, Procedure<?> proc, long[] subprocs) throws IOException { final ProcedureWALEntry.Builder builder = ProcedureWALEntry.newBuilder(); builder.setType(ProcedureWALEntry.Type.PROCEDURE_WAL_DELETE); http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 4ab70f1..1ac8e01 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -15,22 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2.store.wal; -import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; - import java.io.IOException; - import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureUtil; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; -import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; @@ -38,7 +34,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu * Helper class that loads the procedures stored in a WAL */ @InterfaceAudience.Private -@InterfaceStability.Evolving public class ProcedureWALFormatReader { private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class); @@ -98,8 +93,8 @@ public class ProcedureWALFormatReader { // In the case above we need to read one more WAL to be able to consider // the root procedure A and all children as ready. // ============================================================================================== - private final WalProcedureMap localProcedureMap = new WalProcedureMap(1024); - private final WalProcedureMap procedureMap = new WalProcedureMap(1024); + private final WALProcedureMap localProcedureMap = new WALProcedureMap(1024); + private final WALProcedureMap procedureMap = new WALProcedureMap(1024); private final ProcedureWALFormat.Loader loader; @@ -111,33 +106,31 @@ public class ProcedureWALFormatReader { * to rebuild the tracker. */ private final ProcedureStoreTracker tracker; - // TODO: private final boolean hasFastStartSupport; /** - * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we - * re-build the list of procedures updated in that WAL because we need it for log cleaning - * purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted. - * (see {@link WALProcedureStore#removeInactiveLogs()}). - * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother - * re-building it. + * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build + * the list of procedures modified in that WAL because we need it for log cleaning purposes. If + * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see + * {@link WALProcedureStore#removeInactiveLogs()}). + * <p/> + * Notice that, the deleted part for this tracker will not be global valid as we can only count + * the deletes in the current file, but it is not big problem as finally, the above tracker will + * have the global state of deleted, and it will also be used to build the cleanup tracker. */ private ProcedureStoreTracker localTracker; - // private long compactionLogId; private long maxProcId = 0; public ProcedureWALFormatReader(final ProcedureStoreTracker tracker, ProcedureWALFormat.Loader loader) { this.tracker = tracker; this.loader = loader; - // we support fast-start only if we have a clean shutdown. - // this.hasFastStartSupport = !tracker.isEmpty(); } - public void read(final ProcedureWALFile log) throws IOException { - localTracker = log.getTracker().isPartial() ? log.getTracker() : null; - if (localTracker != null) { - LOG.info("Rebuilding tracker for " + log); + public void read(ProcedureWALFile log) throws IOException { + localTracker = log.getTracker(); + if (localTracker.isPartial()) { + LOG.info("Rebuilding tracker for {}", log); } long count = 0; @@ -147,7 +140,7 @@ public class ProcedureWALFormatReader { while (hasMore) { ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); if (entry == null) { - LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log); + LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log); break; } count++; @@ -178,21 +171,17 @@ public class ProcedureWALFormatReader { loader.markCorruptedWAL(log, e); } - if (localTracker != null) { - localTracker.setPartialFlag(false); - } if (!localProcedureMap.isEmpty()) { - log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId()); + log.setProcIds(localProcedureMap.getMinModifiedProcId(), + localProcedureMap.getMaxModifiedProcId()); + if (localTracker.isPartial()) { + localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(), + localProcedureMap.getMaxModifiedProcId()); + } procedureMap.mergeTail(localProcedureMap); - - //if (hasFastStartSupport) { - // TODO: Some procedure may be already runnables (see readInitEntry()) - // (we can also check the "update map" in the log trackers) - // -------------------------------------------------- - //EntryIterator iter = procedureMap.fetchReady(); - //if (iter != null) loader.load(iter); - // -------------------------------------------------- - //} + } + if (localTracker.isPartial()) { + localTracker.setPartialFlag(false); } } @@ -202,37 +191,46 @@ public class ProcedureWALFormatReader { // fetch the procedure ready to run. ProcedureIterator procIter = procedureMap.fetchReady(); - if (procIter != null) loader.load(procIter); + if (procIter != null) { + loader.load(procIter); + } // remaining procedures have missing link or dependencies // consider them as corrupted, manual fix is probably required. procIter = procedureMap.fetchAll(); - if (procIter != null) loader.handleCorrupted(procIter); + if (procIter != null) { + loader.handleCorrupted(procIter); + } + } + + private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) { + if (tracker.isPartial()) { + tracker.setDeleted(procId, true); + } } - private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) { + private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) { + if (tracker.isPartial()) { + tracker.insert(proc.getProcId()); + } + } + + private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) { maxProcId = Math.max(maxProcId, proc.getProcId()); if (isRequired(proc.getProcId())) { - if (LOG.isTraceEnabled()) { - LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId()); - } + LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId()); localProcedureMap.add(proc); - if (tracker.isPartial()) { - tracker.insert(proc.getProcId()); - } - } - if (localTracker != null) { - localTracker.insert(proc.getProcId()); + insertIfPartial(tracker, proc); } + insertIfPartial(localTracker, proc); } - private void readInitEntry(final ProcedureWALEntry entry) - throws IOException { + private void readInitEntry(ProcedureWALEntry entry) { assert entry.getProcedureCount() == 1 : "Expected only one procedure"; loadProcedure(entry, entry.getProcedure(0)); } - private void readInsertEntry(final ProcedureWALEntry entry) throws IOException { + private void readInsertEntry(ProcedureWALEntry entry) { assert entry.getProcedureCount() >= 1 : "Expected one or more procedures"; loadProcedure(entry, entry.getProcedure(0)); for (int i = 1; i < entry.getProcedureCount(); ++i) { @@ -240,12 +238,12 @@ public class ProcedureWALFormatReader { } } - private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException { + private void readUpdateEntry(ProcedureWALEntry entry) { assert entry.getProcedureCount() == 1 : "Expected only one procedure"; loadProcedure(entry, entry.getProcedure(0)); } - private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException { + private void readDeleteEntry(ProcedureWALEntry entry) { assert entry.hasProcId() : "expected ProcID"; if (entry.getChildIdCount() > 0) { @@ -267,598 +265,19 @@ public class ProcedureWALFormatReader { } private void deleteEntry(final long procId) { - if (LOG.isTraceEnabled()) { - LOG.trace("delete entry " + procId); - } + LOG.trace("delete entry {}", procId); maxProcId = Math.max(maxProcId, procId); localProcedureMap.remove(procId); assert !procedureMap.contains(procId); - if (tracker.isPartial()) { - tracker.setDeleted(procId, true); - } - if (localTracker != null) { - // In case there is only delete entry for this procedure in current log. - localTracker.setDeleted(procId, true); - } + setDeletedIfPartial(tracker, procId); + setDeletedIfPartial(localTracker, procId); } - private boolean isDeleted(final long procId) { + private boolean isDeleted(long procId) { return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES; } - private boolean isRequired(final long procId) { + private boolean isRequired(long procId) { return !isDeleted(procId) && !procedureMap.contains(procId); } - - // ========================================================================== - // We keep an in-memory map of the procedures sorted by replay order. - // (see the details in the beginning of the file) - // _______________________________________________ - // procedureMap = | A | | E | | C | | | | | G | | | - // D B - // replayOrderHead = C <-> B <-> E <-> D <-> A <-> G - // - // We also have a lazy grouping by "root procedure", and a list of - // unlinked procedures. If after reading all the WALs we have unlinked - // procedures it means that we had a missing WAL or a corruption. - // rootHead = A <-> D <-> G - // B E - // C - // unlinkFromLinkList = None - // ========================================================================== - private static class Entry { - // For bucketed linked lists in hash-table. - protected Entry hashNext; - // child head - protected Entry childHead; - // double-link for rootHead or childHead - protected Entry linkNext; - protected Entry linkPrev; - // replay double-linked-list - protected Entry replayNext; - protected Entry replayPrev; - // procedure-infos - protected Procedure procedure; - protected ProcedureProtos.Procedure proto; - protected boolean ready = false; - - public Entry(Entry hashNext) { - this.hashNext = hashNext; - } - - public long getProcId() { - return proto.getProcId(); - } - - public long getParentId() { - return proto.getParentId(); - } - - public boolean hasParent() { - return proto.hasParentId(); - } - - public boolean isReady() { - return ready; - } - - public boolean isFinished() { - if (!hasParent()) { - // we only consider 'root' procedures. because for the user 'finished' - // means when everything up to the 'root' is finished. - switch (proto.getState()) { - case ROLLEDBACK: - case SUCCESS: - return true; - default: - break; - } - } - return false; - } - - public Procedure convert() throws IOException { - if (procedure == null) { - procedure = ProcedureUtil.convertToProcedure(proto); - } - return procedure; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("Entry("); - sb.append(getProcId()); - sb.append(", parentId="); - sb.append(getParentId()); - sb.append(", class="); - sb.append(proto.getClassName()); - sb.append(")"); - return sb.toString(); - } - } - - private static class EntryIterator implements ProcedureIterator { - private final Entry replayHead; - private Entry current; - - public EntryIterator(Entry replayHead) { - this.replayHead = replayHead; - this.current = replayHead; - } - - @Override - public void reset() { - this.current = replayHead; - } - - @Override - public boolean hasNext() { - return current != null; - } - - @Override - public boolean isNextFinished() { - return current != null && current.isFinished(); - } - - @Override - public void skipNext() { - current = current.replayNext; - } - - @Override - public Procedure next() throws IOException { - try { - return current.convert(); - } finally { - current = current.replayNext; - } - } - } - - private static class WalProcedureMap { - // procedure hash table - private Entry[] procedureMap; - - // replay-order double-linked-list - private Entry replayOrderHead; - private Entry replayOrderTail; - - // root linked-list - private Entry rootHead; - - // pending unlinked children (root not present yet) - private Entry childUnlinkedHead; - - // Track ProcId range - private long minProcId = Long.MAX_VALUE; - private long maxProcId = Long.MIN_VALUE; - - public WalProcedureMap(int size) { - procedureMap = new Entry[size]; - replayOrderHead = null; - replayOrderTail = null; - rootHead = null; - childUnlinkedHead = null; - } - - public void add(ProcedureProtos.Procedure procProto) { - trackProcIds(procProto.getProcId()); - Entry entry = addToMap(procProto.getProcId(), procProto.hasParentId()); - boolean newEntry = entry.proto == null; - // We have seen procedure WALs where the entries are out of order; see HBASE-18152. - // To compensate, only replace the Entry procedure if for sure this new procedure - // is indeed an entry that came later. TODO: Fix the writing of procedure info so - // it does not violate basic expectation, that WALs contain procedure changes going - // from start to finish in sequence. - if (newEntry || isIncreasing(entry.proto, procProto)) { - entry.proto = procProto; - } - addToReplayList(entry); - if(newEntry) { - if (procProto.hasParentId()) { - childUnlinkedHead = addToLinkList(entry, childUnlinkedHead); - } else { - rootHead = addToLinkList(entry, rootHead); - } - } - } - - /** - * @return True if this new procedure is 'richer' than the current one else - * false and we log this incidence where it appears that the WAL has older entries - * appended after newer ones. See HBASE-18152. - */ - private static boolean isIncreasing(ProcedureProtos.Procedure current, - ProcedureProtos.Procedure candidate) { - // Check that the procedures we see are 'increasing'. We used to compare - // procedure id first and then update time but it can legitimately go backwards if the - // procedure is failed or rolled back so that was unreliable. Was going to compare - // state but lets see if comparing update time enough (unfortunately this issue only - // seen under load...) - boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate(); - if (!increasing) { - LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate); - } - return increasing; - } - - public boolean remove(long procId) { - trackProcIds(procId); - Entry entry = removeFromMap(procId); - if (entry != null) { - unlinkFromReplayList(entry); - unlinkFromLinkList(entry); - return true; - } - return false; - } - - private void trackProcIds(long procId) { - minProcId = Math.min(minProcId, procId); - maxProcId = Math.max(maxProcId, procId); - } - - public long getMinProcId() { - return minProcId; - } - - public long getMaxProcId() { - return maxProcId; - } - - public boolean contains(long procId) { - return getProcedure(procId) != null; - } - - public boolean isEmpty() { - return replayOrderHead == null; - } - - public void clear() { - for (int i = 0; i < procedureMap.length; ++i) { - procedureMap[i] = null; - } - replayOrderHead = null; - replayOrderTail = null; - rootHead = null; - childUnlinkedHead = null; - minProcId = Long.MAX_VALUE; - maxProcId = Long.MIN_VALUE; - } - - /* - * Merges two WalProcedureMap, - * the target is the "global" map, the source is the "local" map. - * - The entries in the hashtables are guaranteed to be unique. - * On replay we don't load procedures that already exist in the "global" - * map (the one we are merging the "local" in to). - * - The replayOrderList of the "local" nao will be appended to the "global" - * map replay list. - * - The "local" map will be cleared at the end of the operation. - */ - public void mergeTail(WalProcedureMap other) { - for (Entry p = other.replayOrderHead; p != null; p = p.replayNext) { - int slotIndex = getMapSlot(p.getProcId()); - p.hashNext = procedureMap[slotIndex]; - procedureMap[slotIndex] = p; - } - - if (replayOrderHead == null) { - replayOrderHead = other.replayOrderHead; - replayOrderTail = other.replayOrderTail; - rootHead = other.rootHead; - childUnlinkedHead = other.childUnlinkedHead; - } else { - // append replay list - assert replayOrderTail.replayNext == null; - assert other.replayOrderHead.replayPrev == null; - replayOrderTail.replayNext = other.replayOrderHead; - other.replayOrderHead.replayPrev = replayOrderTail; - replayOrderTail = other.replayOrderTail; - - // merge rootHead - if (rootHead == null) { - rootHead = other.rootHead; - } else if (other.rootHead != null) { - Entry otherTail = findLinkListTail(other.rootHead); - otherTail.linkNext = rootHead; - rootHead.linkPrev = otherTail; - rootHead = other.rootHead; - } - - // merge childUnlinkedHead - if (childUnlinkedHead == null) { - childUnlinkedHead = other.childUnlinkedHead; - } else if (other.childUnlinkedHead != null) { - Entry otherTail = findLinkListTail(other.childUnlinkedHead); - otherTail.linkNext = childUnlinkedHead; - childUnlinkedHead.linkPrev = otherTail; - childUnlinkedHead = other.childUnlinkedHead; - } - } - maxProcId = Math.max(maxProcId, other.maxProcId); - minProcId = Math.max(minProcId, other.minProcId); - - other.clear(); - } - - /* - * Returns an EntryIterator with the list of procedures ready - * to be added to the executor. - * A Procedure is ready if its children and parent are ready. - */ - public EntryIterator fetchReady() { - buildGraph(); - - Entry readyHead = null; - Entry readyTail = null; - Entry p = replayOrderHead; - while (p != null) { - Entry next = p.replayNext; - if (p.isReady()) { - unlinkFromReplayList(p); - if (readyTail != null) { - readyTail.replayNext = p; - p.replayPrev = readyTail; - } else { - p.replayPrev = null; - readyHead = p; - } - readyTail = p; - p.replayNext = null; - } - p = next; - } - // we need the hash-table lookups for parents, so this must be done - // out of the loop where we check isReadyToRun() - for (p = readyHead; p != null; p = p.replayNext) { - removeFromMap(p.getProcId()); - unlinkFromLinkList(p); - } - return readyHead != null ? new EntryIterator(readyHead) : null; - } - - /* - * Drain this map and return all procedures in it. - */ - public EntryIterator fetchAll() { - Entry head = replayOrderHead; - for (Entry p = head; p != null; p = p.replayNext) { - removeFromMap(p.getProcId()); - } - for (int i = 0; i < procedureMap.length; ++i) { - assert procedureMap[i] == null : "map not empty i=" + i; - } - replayOrderHead = null; - replayOrderTail = null; - childUnlinkedHead = null; - rootHead = null; - return head != null ? new EntryIterator(head) : null; - } - - private void buildGraph() { - Entry p = childUnlinkedHead; - while (p != null) { - Entry next = p.linkNext; - Entry rootProc = getRootProcedure(p); - if (rootProc != null) { - rootProc.childHead = addToLinkList(p, rootProc.childHead); - } - p = next; - } - - for (p = rootHead; p != null; p = p.linkNext) { - checkReadyToRun(p); - } - } - - private Entry getRootProcedure(Entry entry) { - while (entry != null && entry.hasParent()) { - entry = getProcedure(entry.getParentId()); - } - return entry; - } - - /* - * (see the comprehensive explanation in the beginning of the file) - * A Procedure is ready when parent and children are ready. - * "ready" means that we all the information that we need in-memory. - * - * Example-1: - * We have two WALs, we start reading from the newest (wal-2) - * wal-2 | C B | - * wal-1 | A B C | - * - * If C and B don't depend on A (A is not the parent), we can start them - * before reading wal-1. If B is the only one with parent A we can start C. - * We have to read one more WAL before being able to start B. - * - * How do we know with the only information in B that we are not ready. - * - easy case, the parent is missing from the global map - * - more complex case we look at the Stack IDs. - * - * The Stack-IDs are added to the procedure order as an incremental index - * tracking how many times that procedure was executed, which is equivalent - * to the number of times we wrote the procedure to the WAL. - * In the example above: - * wal-2: B has stackId = [1, 2] - * wal-1: B has stackId = [1] - * wal-1: A has stackId = [0] - * - * Since we know that the Stack-IDs are incremental for a Procedure, - * we notice that there is a gap in the stackIds of B, so something was - * executed before. - * To identify when a Procedure is ready we do the sum of the stackIds of - * the procedure and the parent. if the stackIdSum is equal to the - * sum of {1..maxStackId} then everything we need is available. - * - * Example-2 - * wal-2 | A | A stackIds = [0, 2] - * wal-1 | A B | B stackIds = [1] - * - * There is a gap between A stackIds so something was executed in between. - */ - private boolean checkReadyToRun(Entry rootEntry) { - assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry; - - if (rootEntry.isFinished()) { - // If the root procedure is finished, sub-procedures should be gone - if (rootEntry.childHead != null) { - LOG.error("unexpected active children for root-procedure: " + rootEntry); - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - LOG.error("unexpected active children: " + p); - } - } - - assert rootEntry.childHead == null : "unexpected children on root completion. " + rootEntry; - rootEntry.ready = true; - return true; - } - - int stackIdSum = 0; - int maxStackId = 0; - for (int i = 0; i < rootEntry.proto.getStackIdCount(); ++i) { - int stackId = 1 + rootEntry.proto.getStackId(i); - maxStackId = Math.max(maxStackId, stackId); - stackIdSum += stackId; - if (LOG.isTraceEnabled()) { - LOG.trace("stackId=" + stackId + " stackIdSum=" + stackIdSum + - " maxStackid=" + maxStackId + " " + rootEntry); - } - } - - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - for (int i = 0; i < p.proto.getStackIdCount(); ++i) { - int stackId = 1 + p.proto.getStackId(i); - maxStackId = Math.max(maxStackId, stackId); - stackIdSum += stackId; - if (LOG.isTraceEnabled()) { - LOG.trace("stackId=" + stackId + " stackIdSum=" + stackIdSum + - " maxStackid=" + maxStackId + " " + p); - } - } - } - // The cmpStackIdSum is this formula for finding the sum of a series of numbers: - // http://www.wikihow.com/Sum-the-Integers-from-1-to-N#/Image:Sum-the-Integers-from-1-to-N-Step-2-Version-3.jpg - final int cmpStackIdSum = (maxStackId * (maxStackId + 1) / 2); - if (cmpStackIdSum == stackIdSum) { - rootEntry.ready = true; - for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { - p.ready = true; - } - return true; - } - return false; - } - - private void unlinkFromReplayList(Entry entry) { - if (replayOrderHead == entry) { - replayOrderHead = entry.replayNext; - } - if (replayOrderTail == entry) { - replayOrderTail = entry.replayPrev; - } - if (entry.replayPrev != null) { - entry.replayPrev.replayNext = entry.replayNext; - } - if (entry.replayNext != null) { - entry.replayNext.replayPrev = entry.replayPrev; - } - } - - private void addToReplayList(final Entry entry) { - unlinkFromReplayList(entry); - entry.replayNext = replayOrderHead; - entry.replayPrev = null; - if (replayOrderHead != null) { - replayOrderHead.replayPrev = entry; - } else { - replayOrderTail = entry; - } - replayOrderHead = entry; - } - - private void unlinkFromLinkList(Entry entry) { - if (entry == rootHead) { - rootHead = entry.linkNext; - } else if (entry == childUnlinkedHead) { - childUnlinkedHead = entry.linkNext; - } - if (entry.linkPrev != null) { - entry.linkPrev.linkNext = entry.linkNext; - } - if (entry.linkNext != null) { - entry.linkNext.linkPrev = entry.linkPrev; - } - } - - private Entry addToLinkList(Entry entry, Entry linkHead) { - unlinkFromLinkList(entry); - entry.linkNext = linkHead; - entry.linkPrev = null; - if (linkHead != null) { - linkHead.linkPrev = entry; - } - return entry; - } - - private Entry findLinkListTail(Entry linkHead) { - Entry tail = linkHead; - while (tail.linkNext != null) { - tail = tail.linkNext; - } - return tail; - } - - private Entry addToMap(final long procId, final boolean hasParent) { - int slotIndex = getMapSlot(procId); - Entry entry = getProcedure(slotIndex, procId); - if (entry != null) return entry; - - entry = new Entry(procedureMap[slotIndex]); - procedureMap[slotIndex] = entry; - return entry; - } - - private Entry removeFromMap(final long procId) { - int slotIndex = getMapSlot(procId); - Entry prev = null; - Entry entry = procedureMap[slotIndex]; - while (entry != null) { - if (procId == entry.getProcId()) { - if (prev != null) { - prev.hashNext = entry.hashNext; - } else { - procedureMap[slotIndex] = entry.hashNext; - } - entry.hashNext = null; - return entry; - } - prev = entry; - entry = entry.hashNext; - } - return null; - } - - private Entry getProcedure(final long procId) { - return getProcedure(getMapSlot(procId), procId); - } - - private Entry getProcedure(final int slotIndex, final long procId) { - Entry entry = procedureMap[slotIndex]; - while (entry != null) { - if (procId == entry.getProcId()) { - return entry; - } - entry = entry.hashNext; - } - return null; - } - - private int getMapSlot(final long procId) { - return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length); - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5a300f3f/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java index a7712b1..a11a46b 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPrettyPrinter.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; @@ -30,20 +29,21 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException; -import org.apache.hbase.thirdparty.org.apache.commons.cli.PosixParser; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader; /** * ProcedureWALPrettyPrinter prints the contents of a given ProcedureWAL file @@ -160,7 +160,7 @@ public class ProcedureWALPrettyPrinter extends Configured implements Tool { final List<Path> files = new ArrayList<>(); try { - CommandLine cmd = new PosixParser().parse(options, args); + CommandLine cmd = new DefaultParser().parse(options, args); if (cmd.hasOption("f")) { files.add(new Path(cmd.getOptionValue("f")));