Repository: hbase Updated Branches: refs/heads/master f7d0f15c9 -> 2f8ddf6fc
HBASE-17434: New synchronization scheme for compaction pipeline Signed-off-by: Michael Stack <st...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2f8ddf6f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2f8ddf6f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2f8ddf6f Branch: refs/heads/master Commit: 2f8ddf6fc5f904f0273b07469286e01aa02c7da5 Parents: f7d0f15 Author: eshcar <esh...@yahoo-inc.com> Authored: Sun Jan 8 22:30:44 2017 +0200 Committer: Michael Stack <st...@apache.org> Committed: Thu Jan 12 06:35:58 2017 -0800 ---------------------------------------------------------------------- .../hbase/regionserver/CompactingMemStore.java | 6 +- .../hbase/regionserver/CompactionPipeline.java | 78 ++++++++++++-------- .../apache/hadoop/hbase/io/TestHeapSize.java | 2 + 3 files changed, 53 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/2f8ddf6f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index e1289f8..99c1685 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -217,8 +217,8 @@ public class CompactingMemStore extends AbstractMemStore { @VisibleForTesting @Override protected List<Segment> getSegments() { - List<Segment> pipelineList = pipeline.getSegments(); - List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2); + List<? extends Segment> pipelineList = pipeline.getSegments(); + List<Segment> list = new ArrayList<>(pipelineList.size() + 2); list.add(this.active); list.addAll(pipelineList); list.add(this.snapshot); @@ -264,7 +264,7 @@ public class CompactingMemStore extends AbstractMemStore { * Scanners are ordered from 0 (oldest) to newest in increasing order. */ public List<KeyValueScanner> getScanners(long readPt) throws IOException { - List<Segment> pipelineList = pipeline.getSegments(); + List<? extends Segment> pipelineList = pipeline.getSegments(); long order = pipelineList.size(); // The list of elements in pipeline + the active element + the snapshot segment // TODO : This will change when the snapshot is made of more than one element http://git-wip-us.apache.org/repos/asf/hbase/blob/2f8ddf6f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 9d5df77..fafdbee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -25,50 +25,65 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; /** * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. - * It supports pushing a segment at the head of the pipeline and pulling a segment from the - * tail to flush to disk. - * It also supports swap operation to allow the compactor swap a subset of the segments with a new - * (compacted) one. This swap succeeds only if the version number passed with the list of segments - * to swap is the same as the current version of the pipeline. - * The pipeline version is updated whenever swapping segments or pulling the segment at the tail. + * It supports pushing a segment at the head of the pipeline and removing a segment from the + * tail when it is flushed to disk. + * It also supports swap method to allow the in-memory compaction swap a subset of the segments + * at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version + * number passed with the list of segments to swap is the same as the current version of the + * pipeline. + * Essentially, there are two methods which can change the structure of the pipeline: pushHead() + * and swap(), the later is used both by a flush to disk and by an in-memory compaction. + * The pipeline version is updated by swap(); it allows to identify conflicting operations at the + * suffix of the pipeline. + * + * The synchronization model is copy-on-write. Methods which change the structure of the + * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make + * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read + * method accesses the read-only copy more than once it makes a local copy of it + * to ensure it accesses the same copy. + * + * The methods getVersionedList(), getVersionedTail(), and flattenYoungestSegment() are also + * protected by a lock since they need to have a consistent (atomic) view of the pipeline list + * and version number. */ @InterfaceAudience.Private public class CompactionPipeline { private static final Log LOG = LogFactory.getLog(CompactionPipeline.class); public final static long FIXED_OVERHEAD = ClassSize - .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); - public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST; + .align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG); + public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST); private final RegionServicesForStores region; - private LinkedList<ImmutableSegment> pipeline; - private long version; + private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>(); + // The list is volatile to avoid reading a new allocated reference before the c'tor is executed + private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>(); + // Version is volatile to ensure it is atomically read when not using a lock + private volatile long version = 0; public CompactionPipeline(RegionServicesForStores region) { this.region = region; - this.pipeline = new LinkedList<>(); - this.version = 0; } public boolean pushHead(MutableSegment segment) { ImmutableSegment immutableSegment = SegmentFactory.instance(). createImmutableSegment(segment); synchronized (pipeline){ - return addFirst(immutableSegment); + boolean res = addFirst(immutableSegment); + readOnlyCopy = new LinkedList<>(pipeline); + return res; } } public VersionedSegmentsList getVersionedList() { synchronized (pipeline){ - List<ImmutableSegment> segmentList = new ArrayList<>(pipeline); - return new VersionedSegmentsList(segmentList, version); + return new VersionedSegmentsList(readOnlyCopy, version); } } @@ -93,8 +108,10 @@ public class CompactionPipeline { * During index merge op this will be false and for compaction it will be true. * @return true iff swapped tail with new segment */ - public boolean swap( - VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) { + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT", + justification="Increment is done under a synchronize block so safe") + public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment, + boolean closeSuffix) { if (versionedList.getVersion() != version) { return false; } @@ -115,6 +132,8 @@ public class CompactionPipeline { + ", and the number of cells in new segment is:" + count); } swapSuffix(suffix, segment, closeSuffix); + readOnlyCopy = new LinkedList<>(pipeline); + version++; } if (closeSuffix && region != null) { // update the global memstore size counter @@ -193,35 +212,34 @@ public class CompactionPipeline { } public boolean isEmpty() { - return pipeline.isEmpty(); + return readOnlyCopy.isEmpty(); } - public List<Segment> getSegments() { - synchronized (pipeline){ - return new LinkedList<>(pipeline); - } + public List<? extends Segment> getSegments() { + return readOnlyCopy; } public long size() { - return pipeline.size(); + return readOnlyCopy.size(); } public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; - if (!isEmpty()) { - minSequenceId = pipeline.getLast().getMinSequenceId(); + LinkedList<? extends Segment> localCopy = readOnlyCopy; + if (!localCopy.isEmpty()) { + minSequenceId = localCopy.getLast().getMinSequenceId(); } return minSequenceId; } public MemstoreSize getTailSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + LinkedList<? extends Segment> localCopy = readOnlyCopy; + if (localCopy.isEmpty()) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(localCopy.peekLast().keySize(), localCopy.peekLast().heapOverhead()); } - private void swapSuffix(List<ImmutableSegment> suffix, ImmutableSegment segment, + private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { - version++; // During index merge we won't be closing the segments undergoing the merge. Segment#close() // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data http://git-wip-us.apache.org/repos/asf/hbase/blob/2f8ddf6f/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 6e8f831..ceaadbe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -325,6 +325,7 @@ public class TestHeapSize { expected += ClassSize.estimateBase(AtomicBoolean.class, false); expected += ClassSize.estimateBase(CompactionPipeline.class, false); expected += ClassSize.estimateBase(LinkedList.class, false); + expected += ClassSize.estimateBase(LinkedList.class, false); expected += ClassSize.estimateBase(MemStoreCompactor.class, false); expected += ClassSize.estimateBase(AtomicBoolean.class, false); if (expected != actual) { @@ -333,6 +334,7 @@ public class TestHeapSize { ClassSize.estimateBase(AtomicBoolean.class, true); ClassSize.estimateBase(CompactionPipeline.class, true); ClassSize.estimateBase(LinkedList.class, true); + ClassSize.estimateBase(LinkedList.class, true); ClassSize.estimateBase(MemStoreCompactor.class, true); ClassSize.estimateBase(AtomicBoolean.class, true); assertEquals(expected, actual);