[ 
https://issues.apache.org/jira/browse/HBASE-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenglei updated HBASE-26384:
-----------------------------
    Description: 
When  {{CompactingMemStore}} prepares flushing, {{CompactingMemStore.snapshot}} 
invokes  following {{CompactingMemStore.pushPipelineToSnapshot}} method to get 
{{Snapshot}}, following line 570 and line 575 uses 
{{CompactionPipeline#version}} to track whether the Segments in 
{{CompactionPipeline#pipeline}} has changed since it gets 
{{VersionedSegmentsList}}  in line 570 before emptying 
{{CompactionPipeline#pipeline}} in line 575.  
  {code:java}
  565    private void pushPipelineToSnapshot() {
  566        int iterationsCnt = 0;
  567        boolean done = false;
  568        while (!done) {
  569              iterationsCnt++;
  570              VersionedSegmentsList segments = pipeline.getVersionedList();
  571              pushToSnapshot(segments.getStoreSegments());
  572              // swap can return false in case the pipeline was updated by 
ongoing compaction
  573             // and the version increase, the chance of it happenning is 
very low
  574             // In Swap: don't close segments (they are in snapshot now) 
and don't update the region size
  575            done = pipeline.swap(segments, null, false, false);
                    .......
  }
   {code}
However, when {{CompactingMemStore#inMemoryCompaction}} executes 
{{CompactionPipeline#flattenOneSegment}}, it does not change  
{{CompactionPipeline#version}} , if there is a  in memeory compaction which 
executes  {{CompactingMemStore#flattenOneSegment}} between above line 570 and 
line 575, the  {{CompactionPipeline#version}} not change, but the 
{{ImmutableSegment}} in {{CompactionPipeline}} has changed.  
{{pipeline.swap(segments, null, false, false)}} in above line 575 could 
successfully invokes following {{CompactionPipeline#swapSuffix}} method to 
remove {{Segment}} in {{CompactionPipeline}} bcause 
{{CompactionPipeline#version}} not change, but the {{Segment}} in 
{{CompactionPipeline}} has changed because 
{{CompactingMemStore#flattenOneSegment}} is completed , so the {{Segment}} not 
removed in following line 295 and still remaining in {{CompactionPipeline}}. 
  {code:java}
  293  private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment 
segment,
  294         boolean closeSegmentsInSuffix) {
  295      pipeline.removeAll(suffix);
  296      if(segment != null) pipeline.addLast(segment);
             ....
{code}

However {{CompactingMemStore.snapshot}} think it is successful and continues to 
flush the {{Segment}} got by {{CompactingMemStore.snapshot}}  as normal, but 
the {{Segment}} with the same cells still be left in {{CompactingMemStore}}. 
Leaving {{Segment}} which already flushed in {{MemStore}} is dangerous: if a 
Major Compaction before the left {{Segment}} flushing, there may be data 
erroneous.

My Fix in the PR is as following:
# Increasing the {{CompactionPipeline#version}}  in 
{{CompactingMemStore#flattenOneSegment}} 
# For {{CompactionPipeline#swapSuffix}} in above step 6 , explicitly checking 
that the Segments in {{suffix}} input parameter is same as the Segments in 
{{pipeline}} one by one from 
   the last element to the first element of {{suffix}} .


 



  was:
When  {{CompactingMemStore}} prepares flushing, {{CompactingMemStore.snapshot}} 
uses {{CompactionPipeline#version}} to track whether the Segments in 
{{CompactionPipeline#pipeline}} has changed since it gets 
{{VersionedSegmentsList}}  last time before emptying 
{{CompactionPipeline#pipeline}}.  However, when 
{{CompactingMemStore#inMemoryCompaction}} executes 
{{CompactionPipeline#flattenOneSegment}}, it does not change   
{{CompactionPipeline#version}} , which could causes the segment  which has be 
collected to {{MemStoreSnapshot}} may still be remained in 
{{CompactingMemStore}}. 
Considing following  thread sequence:
1. When {{CompactingMemStore}} size exceeds 
{{CompactingMemStore#getInmemoryFlushSize}}, the write thread adds a new 
{{ImmutableSegment}}  to the head of 
   {{CompactingMemStore#pipeline}}, and start a in memory compact thread to 
execute{{CompactingMemStore#inMemoryCompaction}}  .
2. The in memory compact thread starts and then stopping before 
{{CompactingMemStore#flattenOneSegment}}.
3. The snapshot thread starts {{CompactingMemStore#snapshot}} concurrently, 
after the snapshot thread executing following line 570  
{{pipeline.getVersionedList}}, the in memory 
   compact thread continues. Assuming {{VersionedSegmentsList#version}} 
returned from  {{pipeline.getVersionedList}} is v.
   {code:java}
  565    private void pushPipelineToSnapshot() {
  566        int iterationsCnt = 0;
  567        boolean done = false;
  568        while (!done) {
  569              iterationsCnt++;
  570              VersionedSegmentsList segments = pipeline.getVersionedList();
  571              pushToSnapshot(segments.getStoreSegments());
  572              // swap can return false in case the pipeline was updated by 
ongoing compaction
  573             // and the version increase, the chance of it happenning is 
very low
  574             // In Swap: don't close segments (they are in snapshot now) 
and don't update the region size
  575            done = pipeline.swap(segments, null, false, false);
                    .......
  }
   {code}
   
4. The snapshot thread stopping before {{pipeline.swap(segments, null, false, 
false)}} in above line 575.
5. The in memory compact thread completes 
{{CompactingMemStore#flattenOneSegment}},{{CompactingMemStore#flattenOneSegment}}
 does not change  
   {{CompactionPipeline#version}}, {{CompactionPipeline#version}} is still v.
6. The snapshot thread continues  {{pipeline.swap(segments, null, false, 
false)}} in above line 575, and because {{CompactionPipeline#version}}
   is still v, {{pipeline.swap(segments, null, false, false)}}  successfully 
invokes following {{swapSuffix}} to remove {{ImmutableSegment}} in 
{{CompactionPipeline}} , 
  {code:java}
  293  private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment 
segment,
  294         boolean closeSegmentsInSuffix) {
  295      pipeline.removeAll(suffix);
  296      if(segment != null) pipeline.addLast(segment);
             ....
{code}
  but the {{ImmutableSegment}} in {{CompactionPipeline}} has changed because 
{{CompactingMemStore#flattenOneSegment}} is completed in step 5, so the 
{{ImmutableSegment}} 
  in {{CompactionPipeline}} is not removed in above line 295 and still 
remaining in {{CompactionPipeline}}.However the snapshot thread think 
  {{CompactingMemStore.snapshot}} is successful and continues to flush the 
{{Segment}} got by {{pipeline.getVersionedList}} in step 3 as normal, but the 
{{Segment}} with the same 
  cells still be left in {{CompactingMemStore}}. 
  Leaving {{Segment}} which already flushed in {{MemStore}} is dangerous: if a 
Major Compaction before the left {{Segment}} flushing, there may be data 
erroneous.

My Fix in the PR is as following:
# Increasing the {{CompactionPipeline#version}}  in 
{{CompactingMemStore#flattenOneSegment}} 
# For {{CompactionPipeline#swapSuffix}} in above step 6 , explicitly checking 
that the Segments in {{suffix}} input parameter is same as the Segments in 
{{pipeline}} one by one from 
   the last element to the first element of {{suffix}} .


 




> Segment  which already flushed to hfile may still be remained in 
> CompactingMemStore 
> ------------------------------------------------------------------------------------
>
>                 Key: HBASE-26384
>                 URL: https://issues.apache.org/jira/browse/HBASE-26384
>             Project: HBase
>          Issue Type: Bug
>          Components: in-memory-compaction
>    Affects Versions: 3.0.0-alpha-1, 2.4.7
>            Reporter: chenglei
>            Priority: Major
>
> When  {{CompactingMemStore}} prepares flushing, 
> {{CompactingMemStore.snapshot}} invokes  following 
> {{CompactingMemStore.pushPipelineToSnapshot}} method to get {{Snapshot}}, 
> following line 570 and line 575 uses {{CompactionPipeline#version}} to track 
> whether the Segments in {{CompactionPipeline#pipeline}} has changed since it 
> gets {{VersionedSegmentsList}}  in line 570 before emptying 
> {{CompactionPipeline#pipeline}} in line 575.  
>   {code:java}
>   565    private void pushPipelineToSnapshot() {
>   566        int iterationsCnt = 0;
>   567        boolean done = false;
>   568        while (!done) {
>   569              iterationsCnt++;
>   570              VersionedSegmentsList segments = 
> pipeline.getVersionedList();
>   571              pushToSnapshot(segments.getStoreSegments());
>   572              // swap can return false in case the pipeline was updated 
> by ongoing compaction
>   573             // and the version increase, the chance of it happenning is 
> very low
>   574             // In Swap: don't close segments (they are in snapshot now) 
> and don't update the region size
>   575            done = pipeline.swap(segments, null, false, false);
>                     .......
>   }
>    {code}
> However, when {{CompactingMemStore#inMemoryCompaction}} executes 
> {{CompactionPipeline#flattenOneSegment}}, it does not change  
> {{CompactionPipeline#version}} , if there is a  in memeory compaction which 
> executes  {{CompactingMemStore#flattenOneSegment}} between above line 570 and 
> line 575, the  {{CompactionPipeline#version}} not change, but the 
> {{ImmutableSegment}} in {{CompactionPipeline}} has changed.  
> {{pipeline.swap(segments, null, false, false)}} in above line 575 could 
> successfully invokes following {{CompactionPipeline#swapSuffix}} method to 
> remove {{Segment}} in {{CompactionPipeline}} bcause 
> {{CompactionPipeline#version}} not change, but the {{Segment}} in 
> {{CompactionPipeline}} has changed because 
> {{CompactingMemStore#flattenOneSegment}} is completed , so the {{Segment}} 
> not removed in following line 295 and still remaining in 
> {{CompactionPipeline}}. 
>   {code:java}
>   293  private void swapSuffix(List<? extends Segment> suffix, 
> ImmutableSegment segment,
>   294         boolean closeSegmentsInSuffix) {
>   295      pipeline.removeAll(suffix);
>   296      if(segment != null) pipeline.addLast(segment);
>              ....
> {code}
> However {{CompactingMemStore.snapshot}} think it is successful and continues 
> to flush the {{Segment}} got by {{CompactingMemStore.snapshot}}  as normal, 
> but the {{Segment}} with the same cells still be left in 
> {{CompactingMemStore}}. Leaving {{Segment}} which already flushed in 
> {{MemStore}} is dangerous: if a Major Compaction before the left {{Segment}} 
> flushing, there may be data erroneous.
> My Fix in the PR is as following:
> # Increasing the {{CompactionPipeline#version}}  in 
> {{CompactingMemStore#flattenOneSegment}} 
> # For {{CompactionPipeline#swapSuffix}} in above step 6 , explicitly checking 
> that the Segments in {{suffix}} input parameter is same as the Segments in 
> {{pipeline}} one by one from 
>    the last element to the first element of {{suffix}} .
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to