[ 
https://issues.apache.org/jira/browse/HBASE-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chenglei updated HBASE-26384:
-----------------------------
    Description: 
When  {{CompactingMemStore}} prepares flushing, {{CompactingMemStore.snapshot}} 
uses {{CompactionPipeline#version}} to track whether the 
{{CompactionPipeline#pipeline}} has
changed since it acquired {{VersionedSegmentsList}}  last time before emptying 
{{CompactionPipeline#pipeline}}.  However, when 
{{CompactingMemStore#inMemoryCompaction}} executes 
{{CompactionPipeline#flattenOneSegment}}, it does not change the  
{{CompactionPipeline#version}} , which could causes the segment  which has be 
already collected to {{MemStoreSnapshot}} may still be remained in 
CompactingMemStore. 
Considing following  thread sequence:
# When {{CompactingMemStore}} size exceeds 
{{CompactingMemStore#getInmemoryFlushSize}}, the write thread adds a new 
{{ImmutableSegment}}  to the head of 
   {{CompactingMemStore#pipeline}}, and start {{InMemoryCompactionRunnable}} 
asynchronously in {{CompactingMemStore#tryFlushInMemoryAndCompactingAsync}}.
# The in memory compact thread starts and then stopping before 
{{CompactingMemStore#flattenOneSegment}}.
# The snapshot thread starts {{CompactingMemStore#snapshot}} concurrently, 
after the snapshot thread 
   executing fllowing line 570  {{pipeline.getVersionedList}}, the in memory 
compact thread continues.
   {code:java}
  565    private void pushPipelineToSnapshot() {
  566        int iterationsCnt = 0;
  567        boolean done = false;
  568        while (!done) {
  569              iterationsCnt++;
  570              VersionedSegmentsList segments = pipeline.getVersionedList();
  571              pushToSnapshot(segments.getStoreSegments());
  572              // swap can return false in case the pipeline was updated by 
ongoing compaction
  573             // and the version increase, the chance of it happenning is 
very low
  574             // In Swap: don't close segments (they are in snapshot now) 
and don't update the region size
  575            done = pipeline.swap(segments, null, false, false);
                    .......
  }
   {code}
    Assuming {{VersionedSegmentsList#version}} returned from 
{{CompactingMemStore#getPipelineVersionedSegmentsList}} is v.
# The snapshot thread stopping before {{pipeline.swap(segments, null, false, 
false)}} in above line 575.
# The in memory compact thread completes 
{{CompactingMemStore#flattenOneSegment}},{{CompactingMemStore#flattenOneSegment}}
 does not change  
   {{CompactionPipeline#version}}, {@link CompactionPipeline#version} is still 
v.
# The snapshot thread continues  {{pipeline.swap(segments, null, false, 
false)}} in above line 575, and because {{CompactionPipeline#version}}
   is still v, {{pipeline.swap(segments, null, false, false)}}  successfully 
invokes following {{swapSuffix}} to remove {{ImmutableSegment}} in 
{{CompactionPipeline}} , 
  {code:java}
  293  private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment 
segment,
  294         boolean closeSegmentsInSuffix) {
  295      pipeline.removeAll(suffix);
  296      if(segment != null) pipeline.addLast(segment);
             ....
{code:java}
  but the {{ImmutableSegment}} in {{CompactionPipeline}} has changed because 
{{CompactingMemStore#flattenOneSegment}} is completed in step 6, so
  the {{ImmutableSegment}} in {{CompactionPipeline}} is not removed and is 
still remained in {{CompactionPipeline}}.However the snapshot thread think 
  {{pipeline.swap(segments, null, false, false)}} and 
{{CompactingMemStore.snapshot}} is successful and continues to flush the 
{{Segment}} got by {{pipeline.getVersionedList}} got in 
  step 3 to hfile as normal, but the {{Segment}} with the same cells still left 
in   {{CompactingMemStore}}. Left 

 



  was:
When  {{CompactingMemStore}} prepares flushing, {{CompactingMemStore.snapshot}} 
uses {{CompactionPipeline#version}} to track whether the 
{{CompactionPipeline#pipeline}} has
changed since it acquired {{VersionedSegmentsList}}  last time before emptying 
{{CompactionPipeline#pipeline}}.  However, when 
{{CompactingMemStore#inMemoryCompaction}} executes 
{{CompactionPipeline#flattenOneSegment}}, it does not change the  
{{CompactionPipeline#version}} , which could causes the segment  which has be 
already collected to {{MemStoreSnapshot}} may still be remained in 
CompactingMemStore. 
Considing following  thread sequence:
# When {{CompactingMemStore}} size exceeds 
{{CompactingMemStore#getInmemoryFlushSize}}, the write thread adds a new 
{{ImmutableSegment}}  to the head of 
   {{CompactingMemStore#pipeline}}, and start {{InMemoryCompactionRunnable}} 
asynchronously in {{CompactingMemStore#tryFlushInMemoryAndCompactingAsync}}.
# The in memory compact thread starts and then stopping before 
{{CompactingMemStore#flattenOneSegment}}
# The snapshot thread starts {{CompactingMemStore#getSnapshot}} concurrently, 
after the snapshot thread 
   executing fllowing line 570  {{pipeline.getVersionedList}}, the in memory 
compact thread continues.
   {code:java}
  565    private void pushPipelineToSnapshot() {
  566        int iterationsCnt = 0;
  567        boolean done = false;
  568        while (!done) {
  569              iterationsCnt++;
  570              VersionedSegmentsList segments = pipeline.getVersionedList();
  571              pushToSnapshot(segments.getStoreSegments());
  572            // swap can return false in case the pipeline was updated by 
ongoing compaction
  573             // and the version increase, the chance of it happenning is 
very low
  574           // In Swap: don't close segments (they are in snapshot now) and 
don't update the region size
  575          done = pipeline.swap(segments, null, false, false);
                    .......
  }
   {code}
    Assuming {{VersionedSegmentsList#version}} returned from 
{{CompactingMemStore#getPipelineVersionedSegmentsList}}
   *    is v.
   * 4. The snapshot thread stopping before {@link 
CompactingMemStore#swapPipelineWithNull}
   * 5. The in memory compact thread completes {@link 
CompactingMemStore#flattenOneSegment},
   *    {@link CompactingMemStore#flattenOneSegment} does not change {@link 
CompactionPipeline#version},
   *    {@link CompactionPipeline#version} is still v.
   * 5. The snapshot thread continues {@link 
CompactingMemStore#swapPipelineWithNull}, and because {@link 
CompactionPipeline#version}
   *    is still v, {@link CompactingMemStore#swapPipelineWithNull} succeeds, 
but the {@link ImmutableSegment}
   *    in {@link CompactionPipeline} has changed because {@link 
CompactingMemStore#flattenOneSegment}, so
   *    the {@link ImmutableSegment} in {@link CompactionPipeline} is not 
removed and is still remained in
   *    {@link CompactionPipeline}.  




> Segment  which already flushed to hfile may still be remained in 
> CompactingMemStore 
> ------------------------------------------------------------------------------------
>
>                 Key: HBASE-26384
>                 URL: https://issues.apache.org/jira/browse/HBASE-26384
>             Project: HBase
>          Issue Type: Bug
>          Components: in-memory-compaction
>    Affects Versions: 3.0.0-alpha-1, 2.4.7
>            Reporter: chenglei
>            Priority: Major
>
> When  {{CompactingMemStore}} prepares flushing, 
> {{CompactingMemStore.snapshot}} uses {{CompactionPipeline#version}} to track 
> whether the {{CompactionPipeline#pipeline}} has
> changed since it acquired {{VersionedSegmentsList}}  last time before 
> emptying {{CompactionPipeline#pipeline}}.  However, when 
> {{CompactingMemStore#inMemoryCompaction}} executes 
> {{CompactionPipeline#flattenOneSegment}}, it does not change the  
> {{CompactionPipeline#version}} , which could causes the segment  which has be 
> already collected to {{MemStoreSnapshot}} may still be remained in 
> CompactingMemStore. 
> Considing following  thread sequence:
> # When {{CompactingMemStore}} size exceeds 
> {{CompactingMemStore#getInmemoryFlushSize}}, the write thread adds a new 
> {{ImmutableSegment}}  to the head of 
>    {{CompactingMemStore#pipeline}}, and start {{InMemoryCompactionRunnable}} 
> asynchronously in {{CompactingMemStore#tryFlushInMemoryAndCompactingAsync}}.
> # The in memory compact thread starts and then stopping before 
> {{CompactingMemStore#flattenOneSegment}}.
> # The snapshot thread starts {{CompactingMemStore#snapshot}} concurrently, 
> after the snapshot thread 
>    executing fllowing line 570  {{pipeline.getVersionedList}}, the in memory 
> compact thread continues.
>    {code:java}
>   565    private void pushPipelineToSnapshot() {
>   566        int iterationsCnt = 0;
>   567        boolean done = false;
>   568        while (!done) {
>   569              iterationsCnt++;
>   570              VersionedSegmentsList segments = 
> pipeline.getVersionedList();
>   571              pushToSnapshot(segments.getStoreSegments());
>   572              // swap can return false in case the pipeline was updated 
> by ongoing compaction
>   573             // and the version increase, the chance of it happenning is 
> very low
>   574             // In Swap: don't close segments (they are in snapshot now) 
> and don't update the region size
>   575            done = pipeline.swap(segments, null, false, false);
>                     .......
>   }
>    {code}
>     Assuming {{VersionedSegmentsList#version}} returned from 
> {{CompactingMemStore#getPipelineVersionedSegmentsList}} is v.
> # The snapshot thread stopping before {{pipeline.swap(segments, null, false, 
> false)}} in above line 575.
> # The in memory compact thread completes 
> {{CompactingMemStore#flattenOneSegment}},{{CompactingMemStore#flattenOneSegment}}
>  does not change  
>    {{CompactionPipeline#version}}, {@link CompactionPipeline#version} is 
> still v.
> # The snapshot thread continues  {{pipeline.swap(segments, null, false, 
> false)}} in above line 575, and because {{CompactionPipeline#version}}
>    is still v, {{pipeline.swap(segments, null, false, false)}}  successfully 
> invokes following {{swapSuffix}} to remove {{ImmutableSegment}} in 
> {{CompactionPipeline}} , 
>   {code:java}
>   293  private void swapSuffix(List<? extends Segment> suffix, 
> ImmutableSegment segment,
>   294         boolean closeSegmentsInSuffix) {
>   295      pipeline.removeAll(suffix);
>   296      if(segment != null) pipeline.addLast(segment);
>              ....
> {code:java}
>   but the {{ImmutableSegment}} in {{CompactionPipeline}} has changed because 
> {{CompactingMemStore#flattenOneSegment}} is completed in step 6, so
>   the {{ImmutableSegment}} in {{CompactionPipeline}} is not removed and is 
> still remained in {{CompactionPipeline}}.However the snapshot thread think 
>   {{pipeline.swap(segments, null, false, false)}} and 
> {{CompactingMemStore.snapshot}} is successful and continues to flush the 
> {{Segment}} got by {{pipeline.getVersionedList}} got in 
>   step 3 to hfile as normal, but the {{Segment}} with the same cells still 
> left in   {{CompactingMemStore}}. Left 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to