[
https://issues.apache.org/jira/browse/HBASE-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
chenglei updated HBASE-26384:
-----------------------------
Description:
When {{CompactingMemStore}} prepares flushing, {{CompactingMemStore.snapshot}}
uses {{CompactionPipeline#version}} to track whether the
{{CompactionPipeline#pipeline}} has
changed since it acquired {{VersionedSegmentsList}} last time before emptying
{{CompactionPipeline#pipeline}}. However, when
{{CompactingMemStore#inMemoryCompaction}} executes
{{CompactionPipeline#flattenOneSegment}}, it does not change the
{{CompactionPipeline#version}} , which could causes the segment which has be
already collected to {{MemStoreSnapshot}} may still be remained in
CompactingMemStore.
Considing following thread sequence:
# When {{CompactingMemStore}} size exceeds
{{CompactingMemStore#getInmemoryFlushSize}}, the write thread adds a new
{{ImmutableSegment}} to the head of
{{CompactingMemStore#pipeline}}, and start {{InMemoryCompactionRunnable}}
asynchronously in {{CompactingMemStore#tryFlushInMemoryAndCompactingAsync}}.
# The in memory compact thread starts and then stopping before
{{CompactingMemStore#flattenOneSegment}}.
# The snapshot thread starts {{CompactingMemStore#snapshot}} concurrently,
after the snapshot thread
executing fllowing line 570 {{pipeline.getVersionedList}}, the in memory
compact thread continues.
{code:java}
565 private void pushPipelineToSnapshot() {
566 int iterationsCnt = 0;
567 boolean done = false;
568 while (!done) {
569 iterationsCnt++;
570 VersionedSegmentsList segments = pipeline.getVersionedList();
571 pushToSnapshot(segments.getStoreSegments());
572 // swap can return false in case the pipeline was updated by
ongoing compaction
573 // and the version increase, the chance of it happenning is
very low
574 // In Swap: don't close segments (they are in snapshot now)
and don't update the region size
575 done = pipeline.swap(segments, null, false, false);
.......
}
{code}
Assuming {{VersionedSegmentsList#version}} returned from
{{CompactingMemStore#getPipelineVersionedSegmentsList}} is v.
# The snapshot thread stopping before {{pipeline.swap(segments, null, false,
false)}} in above line 575.
# The in memory compact thread completes
{{CompactingMemStore#flattenOneSegment}},{{CompactingMemStore#flattenOneSegment}}
does not change
{{CompactionPipeline#version}}, {@link CompactionPipeline#version} is still
v.
# The snapshot thread continues {{pipeline.swap(segments, null, false,
false)}} in above line 575, and because {{CompactionPipeline#version}}
is still v, {{pipeline.swap(segments, null, false, false)}} successfully
invokes following {{swapSuffix}} to remove {{ImmutableSegment}} in
{{CompactionPipeline}} ,
{code:java}
293 private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment
segment,
294 boolean closeSegmentsInSuffix) {
295 pipeline.removeAll(suffix);
296 if(segment != null) pipeline.addLast(segment);
....
{code}
but the {{ImmutableSegment}} in {{CompactionPipeline}} has changed because
{{CompactingMemStore#flattenOneSegment}} is completed in step 6, so
the {{ImmutableSegment}} in {{CompactionPipeline}} is not removed and is
still remained in {{CompactionPipeline}}.However the snapshot thread think
{{pipeline.swap(segments, null, false, false)}} and
{{CompactingMemStore.snapshot}} is successful and continues to flush the
{{Segment}} got by {{pipeline.getVersionedList}} in
step 3 as normal, but the {{Segment}} with the same cells still left in
{{CompactingMemStore}}.
Leaving {{Segment}} which already flushed still in {{MemStore}} is dangerous:
if a Major Compaction before the left {{Segment}} flushing, there may be data
erroneous.
was:
When {{CompactingMemStore}} prepares flushing, {{CompactingMemStore.snapshot}}
uses {{CompactionPipeline#version}} to track whether the
{{CompactionPipeline#pipeline}} has
changed since it acquired {{VersionedSegmentsList}} last time before emptying
{{CompactionPipeline#pipeline}}. However, when
{{CompactingMemStore#inMemoryCompaction}} executes
{{CompactionPipeline#flattenOneSegment}}, it does not change the
{{CompactionPipeline#version}} , which could causes the segment which has be
already collected to {{MemStoreSnapshot}} may still be remained in
CompactingMemStore.
Considing following thread sequence:
# When {{CompactingMemStore}} size exceeds
{{CompactingMemStore#getInmemoryFlushSize}}, the write thread adds a new
{{ImmutableSegment}} to the head of
{{CompactingMemStore#pipeline}}, and start {{InMemoryCompactionRunnable}}
asynchronously in {{CompactingMemStore#tryFlushInMemoryAndCompactingAsync}}.
# The in memory compact thread starts and then stopping before
{{CompactingMemStore#flattenOneSegment}}.
# The snapshot thread starts {{CompactingMemStore#snapshot}} concurrently,
after the snapshot thread
executing fllowing line 570 {{pipeline.getVersionedList}}, the in memory
compact thread continues.
{code:java}
565 private void pushPipelineToSnapshot() {
566 int iterationsCnt = 0;
567 boolean done = false;
568 while (!done) {
569 iterationsCnt++;
570 VersionedSegmentsList segments = pipeline.getVersionedList();
571 pushToSnapshot(segments.getStoreSegments());
572 // swap can return false in case the pipeline was updated by
ongoing compaction
573 // and the version increase, the chance of it happenning is
very low
574 // In Swap: don't close segments (they are in snapshot now)
and don't update the region size
575 done = pipeline.swap(segments, null, false, false);
.......
}
{code}
Assuming {{VersionedSegmentsList#version}} returned from
{{CompactingMemStore#getPipelineVersionedSegmentsList}} is v.
# The snapshot thread stopping before {{pipeline.swap(segments, null, false,
false)}} in above line 575.
# The in memory compact thread completes
{{CompactingMemStore#flattenOneSegment}},{{CompactingMemStore#flattenOneSegment}}
does not change
{{CompactionPipeline#version}}, {@link CompactionPipeline#version} is still
v.
# The snapshot thread continues {{pipeline.swap(segments, null, false,
false)}} in above line 575, and because {{CompactionPipeline#version}}
is still v, {{pipeline.swap(segments, null, false, false)}} successfully
invokes following {{swapSuffix}} to remove {{ImmutableSegment}} in
{{CompactionPipeline}} ,
{code:java}
293 private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment
segment,
294 boolean closeSegmentsInSuffix) {
295 pipeline.removeAll(suffix);
296 if(segment != null) pipeline.addLast(segment);
....
{code:java}
but the {{ImmutableSegment}} in {{CompactionPipeline}} has changed because
{{CompactingMemStore#flattenOneSegment}} is completed in step 6, so
the {{ImmutableSegment}} in {{CompactionPipeline}} is not removed and is
still remained in {{CompactionPipeline}}.However the snapshot thread think
{{pipeline.swap(segments, null, false, false)}} and
{{CompactingMemStore.snapshot}} is successful and continues to flush the
{{Segment}} got by {{pipeline.getVersionedList}} got in
step 3 to hfile as normal, but the {{Segment}} with the same cells still left
in {{CompactingMemStore}}. Left
> Segment which already flushed to hfile may still be remained in
> CompactingMemStore
> ------------------------------------------------------------------------------------
>
> Key: HBASE-26384
> URL: https://issues.apache.org/jira/browse/HBASE-26384
> Project: HBase
> Issue Type: Bug
> Components: in-memory-compaction
> Affects Versions: 3.0.0-alpha-1, 2.4.7
> Reporter: chenglei
> Priority: Major
>
> When {{CompactingMemStore}} prepares flushing,
> {{CompactingMemStore.snapshot}} uses {{CompactionPipeline#version}} to track
> whether the {{CompactionPipeline#pipeline}} has
> changed since it acquired {{VersionedSegmentsList}} last time before
> emptying {{CompactionPipeline#pipeline}}. However, when
> {{CompactingMemStore#inMemoryCompaction}} executes
> {{CompactionPipeline#flattenOneSegment}}, it does not change the
> {{CompactionPipeline#version}} , which could causes the segment which has be
> already collected to {{MemStoreSnapshot}} may still be remained in
> CompactingMemStore.
> Considing following thread sequence:
> # When {{CompactingMemStore}} size exceeds
> {{CompactingMemStore#getInmemoryFlushSize}}, the write thread adds a new
> {{ImmutableSegment}} to the head of
> {{CompactingMemStore#pipeline}}, and start {{InMemoryCompactionRunnable}}
> asynchronously in {{CompactingMemStore#tryFlushInMemoryAndCompactingAsync}}.
> # The in memory compact thread starts and then stopping before
> {{CompactingMemStore#flattenOneSegment}}.
> # The snapshot thread starts {{CompactingMemStore#snapshot}} concurrently,
> after the snapshot thread
> executing fllowing line 570 {{pipeline.getVersionedList}}, the in memory
> compact thread continues.
> {code:java}
> 565 private void pushPipelineToSnapshot() {
> 566 int iterationsCnt = 0;
> 567 boolean done = false;
> 568 while (!done) {
> 569 iterationsCnt++;
> 570 VersionedSegmentsList segments =
> pipeline.getVersionedList();
> 571 pushToSnapshot(segments.getStoreSegments());
> 572 // swap can return false in case the pipeline was updated
> by ongoing compaction
> 573 // and the version increase, the chance of it happenning is
> very low
> 574 // In Swap: don't close segments (they are in snapshot now)
> and don't update the region size
> 575 done = pipeline.swap(segments, null, false, false);
> .......
> }
> {code}
> Assuming {{VersionedSegmentsList#version}} returned from
> {{CompactingMemStore#getPipelineVersionedSegmentsList}} is v.
> # The snapshot thread stopping before {{pipeline.swap(segments, null, false,
> false)}} in above line 575.
> # The in memory compact thread completes
> {{CompactingMemStore#flattenOneSegment}},{{CompactingMemStore#flattenOneSegment}}
> does not change
> {{CompactionPipeline#version}}, {@link CompactionPipeline#version} is
> still v.
> # The snapshot thread continues {{pipeline.swap(segments, null, false,
> false)}} in above line 575, and because {{CompactionPipeline#version}}
> is still v, {{pipeline.swap(segments, null, false, false)}} successfully
> invokes following {{swapSuffix}} to remove {{ImmutableSegment}} in
> {{CompactionPipeline}} ,
> {code:java}
> 293 private void swapSuffix(List<? extends Segment> suffix,
> ImmutableSegment segment,
> 294 boolean closeSegmentsInSuffix) {
> 295 pipeline.removeAll(suffix);
> 296 if(segment != null) pipeline.addLast(segment);
> ....
> {code}
> but the {{ImmutableSegment}} in {{CompactionPipeline}} has changed because
> {{CompactingMemStore#flattenOneSegment}} is completed in step 6, so
> the {{ImmutableSegment}} in {{CompactionPipeline}} is not removed and is
> still remained in {{CompactionPipeline}}.However the snapshot thread think
> {{pipeline.swap(segments, null, false, false)}} and
> {{CompactingMemStore.snapshot}} is successful and continues to flush the
> {{Segment}} got by {{pipeline.getVersionedList}} in
> step 3 as normal, but the {{Segment}} with the same cells still left in
> {{CompactingMemStore}}.
> Leaving {{Segment}} which already flushed still in {{MemStore}} is
> dangerous: if a Major Compaction before the left {{Segment}} flushing, there
> may be data erroneous.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)