[ 
https://issues.apache.org/jira/browse/HBASE-18158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16036823#comment-16036823
 ] 

Chia-Ping Tsai commented on HBASE-18158:
----------------------------------------

bq. So I still do not see how NPE can happen. Would be happy for more 
clarifications...
The first thread will set the versionedList to null and set isInterrupted to 
false after completing the compaction.
{code}
  private void releaseResources() {
    isInterrupted.set(false);
    versionedList = null;
  }
{code}
So the later thread may keep executing the MERGE policy.
{code}
      if (!isInterrupted.get()) {
        if (resultSwapped = compactingMemStore.swapCompactedSegments(
            versionedList, result, (action==Action.MERGE))) {
          // update the wal so it can be truncated and not get too long
          compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // 
only if greater
        }
      }
{code}
This issue is due to the lower threshold of flush size so i think it won't 
happen in production. I just hope to make TestAcid* stable.


> Two running in-memory compaction threads may lose data
> ------------------------------------------------------
>
>                 Key: HBASE-18158
>                 URL: https://issues.apache.org/jira/browse/HBASE-18158
>             Project: HBase
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Chia-Ping Tsai
>            Assignee: Chia-Ping Tsai
>             Fix For: 2.0.0
>
>         Attachments: HBASE-18158.v0.patch
>
>
> {code:title=CompactingMemStore.java|borderStyle=solid}
>   private void stopCompaction() {
>     if (inMemoryFlushInProgress.get()) {
>       compactor.stop();
>       inMemoryFlushInProgress.set(false);
>     }
>   }
> {code}
> The stopCompaction() set inMemoryFlushInProgress to false so there may be two 
> in-memory compaction threads which execute simultaneously. If there are two 
> running InMemoryFlushRunnable, the later InMemoryFlushRunnable may change the 
> versionedList. 
> {code:title=MemStoreCompactor.java|borderStyle=solid}
>   public boolean start() throws IOException {
>     if (!compactingMemStore.hasImmutableSegments()) { // no compaction on 
> empty pipeline
>       return false;
>     }
>     // get a snapshot of the list of the segments from the pipeline,
>     // this local copy of the list is marked with specific version
>     versionedList = compactingMemStore.getImmutableSegments();
> {code}
> And the first InMemoryFlushRunnable will use the chagned versionedList to 
> remove the corresponding segments.
> {code:title=MemStoreCompactor#doCompaction|borderStyle=solid}
>       if (!isInterrupted.get()) {
>         if (resultSwapped = compactingMemStore.swapCompactedSegments(
>             versionedList, result, (action==Action.MERGE))) {
>           // update the wal so it can be truncated and not get too long
>           compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // 
> only if greater
>         }
>       }
> {code}
> In conclusion, first InMemoryFlushRunnable will remove the worng segment. And 
> the later InMemoryFlushRunnable may introduce NPE because first 
> InMemoryFlushRunnable set versionedList to null after compaction.
> {code}
> Exception in thread 
> "RpcServer.default.FPBQ.Fifo.handler=3,queue=0,port=45712-inmemoryCompactions-1496563908038"
>  java.lang.NullPointerException
>         at 
> org.apache.hadoop.hbase.regionserver.CompactionPipeline.swap(CompactionPipeline.java:119)
>         at 
> org.apache.hadoop.hbase.regionserver.CompactingMemStore.swapCompactedSegments(CompactingMemStore.java:283)
>         at 
> org.apache.hadoop.hbase.regionserver.MemStoreCompactor.doCompaction(MemStoreCompactor.java:212)
>         at 
> org.apache.hadoop.hbase.regionserver.MemStoreCompactor.start(MemStoreCompactor.java:122)
>         at 
> org.apache.hadoop.hbase.regionserver.CompactingMemStore.flushInMemory(CompactingMemStore.java:388)
>         at 
> org.apache.hadoop.hbase.regionserver.CompactingMemStore$InMemoryFlushRunnable.run(CompactingMemStore.java:500)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to