This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 6aaef89789792cd7dd1483d10276273b20a35fa3
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Thu Jul 29 18:35:19 2021 +0800

    HBASE-26064 Introduce a StoreFileTracker to abstract the store file 
tracking logic
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
---
 .../hadoop/hbase/mob/DefaultMobStoreCompactor.java |  22 +-
 .../hadoop/hbase/mob/DefaultMobStoreFlusher.java   |   4 +-
 .../regionserver/CreateStoreFileWriterParams.java  | 134 ++++
 .../hbase/regionserver/DateTieredStoreEngine.java  |   5 +-
 .../hbase/regionserver/DefaultStoreEngine.java     |   5 +-
 .../hbase/regionserver/DefaultStoreFlusher.java    |  11 +-
 .../hadoop/hbase/regionserver/HMobStore.java       |   3 +-
 .../hbase/regionserver/HRegionFileSystem.java      |  10 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   | 729 +++++----------------
 .../hadoop/hbase/regionserver/StoreContext.java    |   9 +
 .../hadoop/hbase/regionserver/StoreEngine.java     | 461 ++++++++++++-
 .../hbase/regionserver/StoreFileManager.java       |   9 +
 .../hadoop/hbase/regionserver/StoreFlusher.java    |   9 +-
 .../hadoop/hbase/regionserver/StoreUtils.java      |  37 +-
 .../hbase/regionserver/StripeStoreEngine.java      |   9 +-
 .../hbase/regionserver/StripeStoreFlusher.java     |   9 +-
 .../compactions/AbstractMultiOutputCompactor.java  |   7 +-
 .../hbase/regionserver/compactions/Compactor.java  |  36 +-
 .../regionserver/compactions/DefaultCompactor.java |  16 +-
 .../storefiletracker/DefaultStoreFileTracker.java  |  61 ++
 .../storefiletracker/StoreFileTracker.java         |  75 +++
 .../storefiletracker/StoreFileTrackerBase.java     | 178 +++++
 .../storefiletracker/StoreFileTrackerFactory.java  |  35 +
 .../util/compaction/MajorCompactionRequest.java    |   1 -
 .../org/apache/hadoop/hbase/TestIOFencing.java     |  12 +-
 .../regionserver/TestCacheOnWriteInSchema.java     |   6 +-
 .../hbase/regionserver/TestDefaultStoreEngine.java |   5 +-
 .../hadoop/hbase/regionserver/TestHRegion.java     |   4 +-
 .../hadoop/hbase/regionserver/TestHStore.java      |  33 +-
 .../TestRegionMergeTransactionOnCluster.java       |   6 +-
 .../regionserver/TestStoreFileRefresherChore.java  |   3 +-
 .../regionserver/TestStoreScannerClosure.java      |   6 +-
 .../hbase/regionserver/TestStripeStoreEngine.java  |   2 +
 .../compactions/TestDateTieredCompactor.java       |  12 +-
 .../compactions/TestStripeCompactionPolicy.java    |  12 +-
 .../compactions/TestStripeCompactor.java           |  12 +-
 36 files changed, 1261 insertions(+), 727 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index c45fdff..01fe000 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -144,17 +143,16 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
   };
 
   private final CellSinkFactory<StoreFileWriter> writerFactory =
-      new CellSinkFactory<StoreFileWriter>() {
-        @Override
-        public StoreFileWriter createWriter(InternalScanner scanner,
-            
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
-            boolean shouldDropBehind, boolean major) throws IOException {
-          // make this writer with tags always because of possible new cells 
with tags.
-          return store.createWriterInTmp(fd.maxKeyCount,
-            major ? majorCompactionCompression : minorCompactionCompression,
-            true, true, true, shouldDropBehind);
-        }
-      };
+    new CellSinkFactory<StoreFileWriter>() {
+      @Override
+      public StoreFileWriter createWriter(InternalScanner scanner,
+        org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails 
fd,
+        boolean shouldDropBehind, boolean major) throws IOException {
+        // make this writer with tags always because of possible new cells 
with tags.
+        return store.getStoreEngine().createWriter(
+          createParams(fd, shouldDropBehind, 
major).includeMVCCReadpoint(true).includesTag(true));
+      }
+    };
 
   public DefaultMobStoreCompactor(Configuration conf, HStore store) {
     super(conf, store);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index 480b85c..4a1dc7b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -25,7 +25,6 @@ import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -127,8 +126,7 @@ public class DefaultMobStoreFlusher extends 
DefaultStoreFlusher {
       synchronized (flushLock) {
         status.setStatus("Flushing " + store + ": creating writer");
         // Write the map out to the disk
-        writer = store.createWriterInTmp(cellsCount, 
store.getColumnFamilyDescriptor().getCompressionType(),
-            false, true, true, false);
+        writer = createWriter(snapshot, true);
         IOException e = null;
         try {
           // It's a mob store, flush the cells in a mob way. This is the 
difference of flushing
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
new file mode 100644
index 0000000..10cd9f0
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CreateStoreFileWriterParams.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class CreateStoreFileWriterParams {
+
+  private long maxKeyCount;
+
+  private Compression.Algorithm compression;
+
+  private boolean isCompaction;
+
+  private boolean includeMVCCReadpoint;
+
+  private boolean includesTag;
+
+  private boolean shouldDropBehind;
+
+  private long totalCompactedFilesSize = -1;
+
+  private String fileStoragePolicy = HConstants.EMPTY_STRING;
+
+  private CreateStoreFileWriterParams() {
+  }
+
+  public long maxKeyCount() {
+    return maxKeyCount;
+  }
+
+  public CreateStoreFileWriterParams maxKeyCount(long maxKeyCount) {
+    this.maxKeyCount = maxKeyCount;
+    return this;
+  }
+
+  public Compression.Algorithm compression() {
+    return compression;
+  }
+
+  /**
+   * Set the compression algorithm to use
+   */
+  public CreateStoreFileWriterParams compression(Compression.Algorithm 
compression) {
+    this.compression = compression;
+    return this;
+  }
+
+  public boolean isCompaction() {
+    return isCompaction;
+  }
+
+  /**
+   * Whether we are creating a new file in a compaction
+   */
+  public CreateStoreFileWriterParams isCompaction(boolean isCompaction) {
+    this.isCompaction = isCompaction;
+    return this;
+  }
+
+  public boolean includeMVCCReadpoint() {
+    return includeMVCCReadpoint;
+  }
+
+  /**
+   * Whether to include MVCC or not
+   */
+  public CreateStoreFileWriterParams includeMVCCReadpoint(boolean 
includeMVCCReadpoint) {
+    this.includeMVCCReadpoint = includeMVCCReadpoint;
+    return this;
+  }
+
+  public boolean includesTag() {
+    return includesTag;
+  }
+
+  /**
+   * Whether to includesTag or not
+   */
+  public CreateStoreFileWriterParams includesTag(boolean includesTag) {
+    this.includesTag = includesTag;
+    return this;
+  }
+
+  public boolean shouldDropBehind() {
+    return shouldDropBehind;
+  }
+
+  public CreateStoreFileWriterParams shouldDropBehind(boolean 
shouldDropBehind) {
+    this.shouldDropBehind = shouldDropBehind;
+    return this;
+  }
+
+  public long totalCompactedFilesSize() {
+    return totalCompactedFilesSize;
+  }
+
+  public CreateStoreFileWriterParams totalCompactedFilesSize(long 
totalCompactedFilesSize) {
+    this.totalCompactedFilesSize = totalCompactedFilesSize;
+    return this;
+  }
+
+  public String fileStoragePolicy() {
+    return fileStoragePolicy;
+  }
+
+  public CreateStoreFileWriterParams fileStoragePolicy(String 
fileStoragePolicy) {
+    this.fileStoragePolicy = fileStoragePolicy;
+    return this;
+  }
+
+  public static CreateStoreFileWriterParams create() {
+    return new CreateStoreFileWriterParams();
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
index 1df953d..7422d91 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DateTieredStoreEngine.java
@@ -19,18 +19,17 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
 import 
org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionPolicy;
 import 
org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.DateTieredCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * HBASE-15400 This store engine allows us to store data in date tiered layout 
with exponential
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index 58f8bbb..693b9c9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
@@ -39,8 +38,8 @@ import org.apache.yetus.audience.InterfaceAudience;
  * their derivatives.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class DefaultStoreEngine extends StoreEngine<
-  DefaultStoreFlusher, RatioBasedCompactionPolicy, DefaultCompactor, 
DefaultStoreFileManager> {
+public class DefaultStoreEngine extends StoreEngine<DefaultStoreFlusher,
+  RatioBasedCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
 
   public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY =
       "hbase.hstore.defaultengine.storeflusher.class";
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index a7d7fb1..306760d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -21,15 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default implementation of StoreFlusher.
@@ -60,9 +59,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
       synchronized (flushLock) {
         status.setStatus("Flushing " + store + ": creating writer");
         // Write the map out to the disk
-        writer = store.createWriterInTmp(cellsCount,
-            store.getColumnFamilyDescriptor().getCompressionType(), false, 
true,
-            snapshot.isTagsPresent(), false);
+        writer = createWriter(snapshot, false);
         IOException e = null;
         try {
           performFlush(scanner, writer, throughputController);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 7ce7f03..b00a50c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -28,7 +28,6 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -158,7 +157,7 @@ public class HMobStore extends HStore {
   protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, 
Configuration conf,
       CellComparator cellComparator) throws IOException {
     MobStoreEngine engine = new MobStoreEngine();
-    engine.createComponents(conf, store, cellComparator);
+    engine.createComponentsOnce(conf, store, cellComparator);
     return engine;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 667eabf..2f5f8d7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -145,7 +145,7 @@ public class HRegionFileSystem {
   //  Temp Helpers
   // 
===========================================================================
   /** @return {@link Path} to the region's temp directory, used for file 
creations */
-  Path getTempDir() {
+  public Path getTempDir() {
     return new Path(getRegionDir(), REGION_TEMP_DIR);
   }
 
@@ -240,11 +240,7 @@ public class HRegionFileSystem {
    * @param familyName Column Family Name
    * @return a set of {@link StoreFileInfo} for the specified family.
    */
-  public Collection<StoreFileInfo> getStoreFiles(final byte[] familyName) 
throws IOException {
-    return getStoreFiles(Bytes.toString(familyName));
-  }
-
-  public Collection<StoreFileInfo> getStoreFiles(final String familyName) 
throws IOException {
+  public List<StoreFileInfo> getStoreFiles(final String familyName) throws 
IOException {
     return getStoreFiles(familyName, true);
   }
 
@@ -254,7 +250,7 @@ public class HRegionFileSystem {
    * @param familyName Column Family Name
    * @return a set of {@link StoreFileInfo} for the specified family.
    */
-  public Collection<StoreFileInfo> getStoreFiles(final String familyName, 
final boolean validate)
+  public List<StoreFileInfo> getStoreFiles(final String familyName, final 
boolean validate)
       throws IOException {
     Path familyDir = getStoreDir(familyName);
     FileStatus[] files = CommonFSUtils.listStatus(this.fs, familyDir);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 1555bbe..a5b5733 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
@@ -47,8 +48,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Predicate;
 import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
@@ -70,17 +69,12 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
 import org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.quotas.RegionSizeStore;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -110,7 +104,6 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableCollection
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import 
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 import 
org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
 
@@ -166,16 +159,6 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
   private boolean cacheOnWriteLogged;
 
   /**
-   * RWLock for store operations.
-   * Locked in shared mode when the list of component stores is looked at:
-   *   - all reads/writes to table data
-   *   - checking for split
-   * Locked in exclusive mode when the list of component stores is modified:
-   *   - closing
-   *   - completing a compaction
-   */
-  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  /**
    * Lock specific to archiving compacted store files.  This avoids races 
around
    * the combination of retrieving the list of compacted files and moving them 
to
    * the archive directory.  Since this is usually a background process (other 
than
@@ -283,14 +266,8 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     }
 
     this.storeEngine = createStoreEngine(this, this.conf, 
region.getCellComparator());
-    List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
-    // Move the storeSize calculation out of loadStoreFiles() method, because 
the secondary read
-    // replica's refreshStoreFiles() will also use loadStoreFiles() to refresh 
its store files and
-    // update the storeSize in the refreshStoreSizeAndTotalBytes() finally 
(just like compaction) , so
-    // no need calculate the storeSize twice.
-    this.storeSize.addAndGet(getStorefilesSize(hStoreFiles, sf -> true));
-    
this.totalUncompressedBytes.addAndGet(getTotalUncompressedBytes(hStoreFiles));
-    this.storeEngine.getStoreFileManager().loadFiles(hStoreFiles);
+    storeEngine.initialize(warmup);
+    refreshStoreSizeAndTotalBytes();
 
     flushRetriesNumber = conf.getInt(
         "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER);
@@ -510,105 +487,18 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     this.dataBlockEncoder = blockEncoder;
   }
 
-  /**
-   * Creates an unsorted list of StoreFile loaded in parallel
-   * from the given directory.
-   */
-  private List<HStoreFile> loadStoreFiles(boolean warmup) throws IOException {
-    Collection<StoreFileInfo> files = 
getRegionFileSystem().getStoreFiles(getColumnFamilyName());
-    return openStoreFiles(files, warmup);
-  }
-
-  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, 
boolean warmup)
-      throws IOException {
-    if (CollectionUtils.isEmpty(files)) {
-      return Collections.emptyList();
-    }
-    // initialize the thread pool for opening store files in parallel..
-    ThreadPoolExecutor storeFileOpenerThreadPool =
-      this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpener-"
-        + this.region.getRegionInfo().getEncodedName() + "-" + 
this.getColumnFamilyName());
-    CompletionService<HStoreFile> completionService =
-      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
-
-    int totalValidStoreFile = 0;
-    for (StoreFileInfo storeFileInfo : files) {
-      // The StoreFileInfo will carry store configuration down to HFile, we 
need to set it to
-      // our store's CompoundConfiguration here.
-      storeFileInfo.setConf(conf);
-      // open each store file in parallel
-      completionService.submit(() -> 
this.createStoreFileAndReader(storeFileInfo));
-      totalValidStoreFile++;
-    }
-
-    Set<String> compactedStoreFiles = new HashSet<>();
-    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
-    IOException ioe = null;
-    try {
-      for (int i = 0; i < totalValidStoreFile; i++) {
-        try {
-          HStoreFile storeFile = completionService.take().get();
-          if (storeFile != null) {
-            LOG.debug("loaded {}", storeFile);
-            results.add(storeFile);
-            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
-          }
-        } catch (InterruptedException e) {
-          if (ioe == null) {
-            ioe = new InterruptedIOException(e.getMessage());
-          }
-        } catch (ExecutionException e) {
-          if (ioe == null) {
-            ioe = new IOException(e.getCause());
-          }
-        }
-      }
-    } finally {
-      storeFileOpenerThreadPool.shutdownNow();
-    }
-    if (ioe != null) {
-      // close StoreFile readers
-      boolean evictOnClose =
-          getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): 
true;
-      for (HStoreFile file : results) {
-        try {
-          if (file != null) {
-            file.closeStoreFile(evictOnClose);
-          }
-        } catch (IOException e) {
-          LOG.warn("Could not close store file {}", file, e);
-        }
-      }
-      throw ioe;
-    }
-
-    // Should not archive the compacted store files when region warmup. See 
HBASE-22163.
-    if (!warmup) {
-      // Remove the compacted files from result
-      List<HStoreFile> filesToRemove = new 
ArrayList<>(compactedStoreFiles.size());
-      for (HStoreFile storeFile : results) {
-        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
-          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, 
this);
-          storeFile.getReader().close(storeFile.getCacheConf() != null ?
-            storeFile.getCacheConf().shouldEvictOnClose() : true);
-          filesToRemove.add(storeFile);
-        }
-      }
-      results.removeAll(filesToRemove);
-      if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) {
-        LOG.debug("Moving the files {} to archive", filesToRemove);
-        
getRegionFileSystem().removeStoreFiles(this.getColumnFamilyDescriptor().getNameAsString(),
-            filesToRemove);
-      }
-    }
-
-    return results;
+  private void postRefreshStoreFiles() throws IOException {
+    // Advance the memstore read point to be at least the new store files 
seqIds so that
+    // readers might pick it up. This assumes that the store is not getting 
any writes (otherwise
+    // in-flight transactions might be made visible)
+    getMaxSequenceId().ifPresent(region.getMVCC()::advanceTo);
+    refreshStoreSizeAndTotalBytes();
   }
 
   @Override
   public void refreshStoreFiles() throws IOException {
-    Collection<StoreFileInfo> newFiles = 
getRegionFileSystem().getStoreFiles(getColumnFamilyName());
-    refreshStoreFilesInternal(newFiles);
+    storeEngine.refreshStoreFiles();
+    postRefreshStoreFiles();
   }
 
   /**
@@ -616,89 +506,8 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
    * region replicas to keep up to date with the primary region files.
    */
   public void refreshStoreFiles(Collection<String> newFiles) throws 
IOException {
-    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
-    for (String file : newFiles) {
-      
storeFiles.add(getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), 
file));
-    }
-    refreshStoreFilesInternal(storeFiles);
-  }
-
-  /**
-   * Checks the underlying store files, and opens the files that  have not
-   * been opened, and removes the store file readers for store files no longer
-   * available. Mainly used by secondary region replicas to keep up to date 
with
-   * the primary region files.
-   */
-  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) 
throws IOException {
-    StoreFileManager sfm = storeEngine.getStoreFileManager();
-    Collection<HStoreFile> currentFiles = sfm.getStorefiles();
-    Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles();
-    if (currentFiles == null) {
-      currentFiles = Collections.emptySet();
-    }
-    if (newFiles == null) {
-      newFiles = Collections.emptySet();
-    }
-    if (compactedFiles == null) {
-      compactedFiles = Collections.emptySet();
-    }
-
-    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new 
HashMap<>(currentFiles.size());
-    for (HStoreFile sf : currentFiles) {
-      currentFilesSet.put(sf.getFileInfo(), sf);
-    }
-    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new 
HashMap<>(compactedFiles.size());
-    for (HStoreFile sf : compactedFiles) {
-      compactedFilesSet.put(sf.getFileInfo(), sf);
-    }
-
-    Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
-    // Exclude the files that have already been compacted
-    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
-    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, 
currentFilesSet.keySet());
-    Set<StoreFileInfo> toBeRemovedFiles = 
Sets.difference(currentFilesSet.keySet(), newFilesSet);
-
-    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
-      return;
-    }
-
-    LOG.info("Refreshing store files for " + this + " files to add: "
-      + toBeAddedFiles + " files to remove: " + toBeRemovedFiles);
-
-    Set<HStoreFile> toBeRemovedStoreFiles = new 
HashSet<>(toBeRemovedFiles.size());
-    for (StoreFileInfo sfi : toBeRemovedFiles) {
-      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
-    }
-
-    // try to open the files
-    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
-
-    // propogate the file changes to the underlying store file manager
-    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an 
exception
-
-    // Advance the memstore read point to be at least the new store files 
seqIds so that
-    // readers might pick it up. This assumes that the store is not getting 
any writes (otherwise
-    // in-flight transactions might be made visible)
-    if (!toBeAddedFiles.isEmpty()) {
-      // we must have the max sequence id here as we do have several store 
files
-      region.getMVCC().advanceTo(this.getMaxSequenceId().getAsLong());
-    }
-
-    refreshStoreSizeAndTotalBytes();
-  }
-
-  protected HStoreFile createStoreFileAndReader(final Path p) throws 
IOException {
-    StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(),
-        p, isPrimaryReplicaStore());
-    return createStoreFileAndReader(info);
-  }
-
-  private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws 
IOException {
-    info.setRegionCoprocessorHost(this.region.getCoprocessorHost());
-    HStoreFile storeFile = new HStoreFile(info, 
getColumnFamilyDescriptor().getBloomFilterType(),
-            getCacheConfig());
-    storeFile.initReader();
-    return storeFile;
+    storeEngine.refreshStoreFiles(newFiles);
+    postRefreshStoreFiles();
   }
 
   /**
@@ -721,7 +530,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
    * Adds a value to the memstore
    */
   public void add(final Cell cell, MemStoreSizing memstoreSizing) {
-    lock.readLock().lock();
+    storeEngine.readLock();
     try {
       if (this.currentParallelPutCount.getAndIncrement() > 
this.parallelPutCountPrintThreshold) {
         LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too 
busy!",
@@ -729,7 +538,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       }
       this.memstore.add(cell, memstoreSizing);
     } finally {
-      lock.readLock().unlock();
+      storeEngine.readUnlock();
       currentParallelPutCount.decrementAndGet();
     }
   }
@@ -738,7 +547,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
    * Adds the specified value to the memstore
    */
   public void add(final Iterable<Cell> cells, MemStoreSizing memstoreSizing) {
-    lock.readLock().lock();
+    storeEngine.readLock();
     try {
       if (this.currentParallelPutCount.getAndIncrement() > 
this.parallelPutCountPrintThreshold) {
         LOG.trace("tableName={}, encodedName={}, columnFamilyName={} is too 
busy!",
@@ -746,7 +555,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       }
       memstore.add(cells, memstoreSizing);
     } finally {
-      lock.readLock().unlock();
+      storeEngine.readUnlock();
       currentParallelPutCount.decrementAndGet();
     }
   }
@@ -869,17 +678,16 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     LOG.info("Loaded HFile " + srcPath + " into " + this + " as "
         + dstPath + " - updating store file list.");
 
-    HStoreFile sf = createStoreFileAndReader(dstPath);
+    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
     bulkLoadHFile(sf);
 
-    LOG.info("Successfully loaded {} into {} (new location: {})",
-        srcPath, this, dstPath);
+    LOG.info("Successfully loaded {} into {} (new location: {})", srcPath, 
this, dstPath);
 
     return dstPath;
   }
 
   public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException {
-    HStoreFile sf = createStoreFileAndReader(fileInfo);
+    HStoreFile sf = storeEngine.createStoreFileAndReader(fileInfo);
     bulkLoadHFile(sf);
   }
 
@@ -887,28 +695,75 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     StoreFileReader r = sf.getReader();
     this.storeSize.addAndGet(r.length());
     this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
-
-    // Append the new storefile into the list
-    this.lock.writeLock().lock();
-    try {
-      
this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
-    } finally {
-      // We need the lock, as long as we are updating the storeFiles
-      // or changing the memstore. Let us release it before calling
-      // notifyChangeReadersObservers. See HBASE-4485 for a possible
-      // deadlock scenario that could have happened if continue to hold
-      // the lock.
-      this.lock.writeLock().unlock();
-    }
+    storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
+    });
     LOG.info("Loaded HFile " + sf.getFileInfo() + " into " + this);
     if (LOG.isTraceEnabled()) {
-      String traceMessage = "BULK LOAD time,size,store size,store files ["
-          + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + 
storeSize
-          + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
+      String traceMessage = "BULK LOAD time,size,store size,store files [" +
+        EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + 
storeSize + "," +
+        storeEngine.getStoreFileManager().getStorefileCount() + "]";
       LOG.trace(traceMessage);
     }
   }
 
+  private ImmutableCollection<HStoreFile> closeWithoutLock() throws 
IOException {
+    // Clear so metrics doesn't find them.
+    ImmutableCollection<HStoreFile> result = 
storeEngine.getStoreFileManager().clearFiles();
+    Collection<HStoreFile> compactedfiles = 
storeEngine.getStoreFileManager().clearCompactedFiles();
+    // clear the compacted files
+    if (CollectionUtils.isNotEmpty(compactedfiles)) {
+      removeCompactedfiles(compactedfiles,
+        getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() : 
true);
+    }
+    if (!result.isEmpty()) {
+      // initialize the thread pool for closing store files in parallel.
+      ThreadPoolExecutor storeFileCloserThreadPool =
+        this.region.getStoreFileOpenAndCloseThreadPool("StoreFileCloser-" +
+          this.region.getRegionInfo().getEncodedName() + "-" + 
this.getColumnFamilyName());
+
+      // close each store file in parallel
+      CompletionService<Void> completionService =
+        new ExecutorCompletionService<>(storeFileCloserThreadPool);
+      for (HStoreFile f : result) {
+        completionService.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws IOException {
+            boolean evictOnClose =
+              getCacheConfig() != null ? getCacheConfig().shouldEvictOnClose() 
: true;
+            f.closeStoreFile(evictOnClose);
+            return null;
+          }
+        });
+      }
+
+      IOException ioe = null;
+      try {
+        for (int i = 0; i < result.size(); i++) {
+          try {
+            Future<Void> future = completionService.take();
+            future.get();
+          } catch (InterruptedException e) {
+            if (ioe == null) {
+              ioe = new InterruptedIOException();
+              ioe.initCause(e);
+            }
+          } catch (ExecutionException e) {
+            if (ioe == null) {
+              ioe = new IOException(e.getCause());
+            }
+          }
+        }
+      } finally {
+        storeFileCloserThreadPool.shutdownNow();
+      }
+      if (ioe != null) {
+        throw ioe;
+      }
+    }
+    LOG.trace("Closed {}", this);
+    return result;
+  }
+
   /**
    * Close all the readers We don't need to worry about subsequent requests 
because the Region holds
    * a write lock that will prevent any more reads or writes.
@@ -916,67 +771,18 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
    * @throws IOException on failure
    */
   public ImmutableCollection<HStoreFile> close() throws IOException {
+    // findbugs can not recognize storeEngine.writeLock is just a lock 
operation so it will report
+    // UL_UNRELEASED_LOCK_EXCEPTION_PATH, so here we have to use two try 
finally...
+    // Change later if findbugs becomes smarter in the future.
     this.archiveLock.lock();
-    this.lock.writeLock().lock();
     try {
-      // Clear so metrics doesn't find them.
-      ImmutableCollection<HStoreFile> result = 
storeEngine.getStoreFileManager().clearFiles();
-      Collection<HStoreFile> compactedfiles =
-          storeEngine.getStoreFileManager().clearCompactedFiles();
-      // clear the compacted files
-      if (CollectionUtils.isNotEmpty(compactedfiles)) {
-        removeCompactedfiles(compactedfiles, getCacheConfig() != null ?
-            getCacheConfig().shouldEvictOnClose() : true);
-      }
-      if (!result.isEmpty()) {
-        // initialize the thread pool for closing store files in parallel.
-        ThreadPoolExecutor storeFileCloserThreadPool = this.region
-            .getStoreFileOpenAndCloseThreadPool("StoreFileCloser-"
-              + this.region.getRegionInfo().getEncodedName() + "-" + 
this.getColumnFamilyName());
-
-        // close each store file in parallel
-        CompletionService<Void> completionService =
-          new ExecutorCompletionService<>(storeFileCloserThreadPool);
-        for (HStoreFile f : result) {
-          completionService.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws IOException {
-              boolean evictOnClose =
-                  getCacheConfig() != null? 
getCacheConfig().shouldEvictOnClose(): true;
-              f.closeStoreFile(evictOnClose);
-              return null;
-            }
-          });
-        }
-
-        IOException ioe = null;
-        try {
-          for (int i = 0; i < result.size(); i++) {
-            try {
-              Future<Void> future = completionService.take();
-              future.get();
-            } catch (InterruptedException e) {
-              if (ioe == null) {
-                ioe = new InterruptedIOException();
-                ioe.initCause(e);
-              }
-            } catch (ExecutionException e) {
-              if (ioe == null) {
-                ioe = new IOException(e.getCause());
-              }
-            }
-          }
-        } finally {
-          storeFileCloserThreadPool.shutdownNow();
-        }
-        if (ioe != null) {
-          throw ioe;
-        }
+      this.storeEngine.writeLock();
+      try {
+        return closeWithoutLock();
+      } finally {
+        this.storeEngine.writeUnlock();
       }
-      LOG.trace("Closed {}", this);
-      return result;
     } finally {
-      this.lock.writeLock().unlock();
       this.archiveLock.unlock();
     }
   }
@@ -1006,7 +812,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
         try {
           for (Path pathName : pathNames) {
             lastPathName = pathName;
-            validateStoreFile(pathName);
+            storeEngine.validateStoreFile(pathName);
           }
           return pathNames;
         } catch (Exception e) {
@@ -1052,204 +858,37 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     }
 
     Path dstPath = 
getRegionFileSystem().commitStoreFile(getColumnFamilyName(), path);
-    HStoreFile sf = createStoreFileAndReader(dstPath);
+    HStoreFile sf = storeEngine.createStoreFileAndReader(dstPath);
     StoreFileReader r = sf.getReader();
     this.storeSize.addAndGet(r.length());
     this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
 
-    this.lock.writeLock().lock();
-    try {
-      
this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
-    } finally {
-      this.lock.writeLock().unlock();
-    }
+    storeEngine.addStoreFiles(Lists.newArrayList(sf), () -> {
+    });
 
     LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, 
filesize={}", sf,
       r.getEntries(), r.getSequenceID(), 
TraditionalBinaryPrefix.long2String(r.length(), "B", 1));
     return sf;
   }
 
-  /**
-   * Commit the given {@code files}.
-   * <p/>
-   * We will move the file into data directory, and open it.
-   * @param files the files want to commit
-   * @param validate whether to validate the store files
-   * @return the committed store files
-   */
-  private List<HStoreFile> commitStoreFiles(List<Path> files, boolean 
validate) throws IOException {
-    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
-    HRegionFileSystem hfs = getRegionFileSystem();
-    String familyName = getColumnFamilyName();
-    for (Path file : files) {
-      try {
-        if (validate) {
-          validateStoreFile(file);
-        }
-        Path committedPath = hfs.commitStoreFile(familyName, file);
-        HStoreFile sf = createStoreFileAndReader(committedPath);
-        committedFiles.add(sf);
-      } catch (IOException e) {
-        LOG.error("Failed to commit store file {}", file, e);
-        // Try to delete the files we have committed before.
-        // It is OK to fail when deleting as leaving the file there does not 
cause any data
-        // corruption problem. It just introduces some duplicated data which 
may impact read
-        // performance a little when reading before compaction.
-        for (HStoreFile sf : committedFiles) {
-          Path pathToDelete = sf.getPath();
-          try {
-            sf.deleteStoreFile();
-          } catch (IOException deleteEx) {
-            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store 
file {}", pathToDelete,
-              deleteEx);
-          }
-        }
-        throw new IOException("Failed to commit the flush", e);
-      }
-    }
-    return committedFiles;
-  }
-
-  public StoreFileWriter createWriterInTmp(long maxKeyCount, 
Compression.Algorithm compression,
-    boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
-    boolean shouldDropBehind) throws IOException {
-    return createWriterInTmp(maxKeyCount, compression, isCompaction, 
includeMVCCReadpoint,
-      includesTag, shouldDropBehind, -1, HConstants.EMPTY_STRING);
-  }
-
-  /**
-   * @param compression Compression algorithm to use
-   * @param isCompaction whether we are creating a new file in a compaction
-   * @param includeMVCCReadpoint - whether to include MVCC or not
-   * @param includesTag - includesTag or not
-   * @return Writer for a new StoreFile in the tmp dir.
-   */
-  // TODO : allow the Writer factory to create Writers of ShipperListener type 
only in case of
-  // compaction
-  public StoreFileWriter createWriterInTmp(long maxKeyCount, 
Compression.Algorithm compression,
-      boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
-      boolean shouldDropBehind, long totalCompactedFilesSize, String 
fileStoragePolicy)
-        throws IOException {
-    // creating new cache config for each new writer
-    final CacheConfig cacheConf = getCacheConfig();
-    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
-    if (isCompaction) {
-      // Don't cache data on write on compactions, unless specifically 
configured to do so
-      // Cache only when total file size remains lower than configured 
threshold
-      final boolean cacheCompactedBlocksOnWrite =
-        getCacheConfig().shouldCacheCompactedBlocksOnWrite();
-      // if data blocks are to be cached on write
-      // during compaction, we should forcefully
-      // cache index and bloom blocks as well
-      if (cacheCompactedBlocksOnWrite && totalCompactedFilesSize <= cacheConf
-        .getCacheCompactedBlocksOnWriteThreshold()) {
-        writerCacheConf.enableCacheOnWrite();
-        if (!cacheOnWriteLogged) {
-          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence 
enabled " +
-              "cacheOnWrite for Data blocks, Index blocks and Bloom filter 
blocks", this);
-          cacheOnWriteLogged = true;
-        }
-      } else {
-        writerCacheConf.setCacheDataOnWrite(false);
-        if (totalCompactedFilesSize > 
cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
-          // checking condition once again for logging
-          LOG.debug(
-            "For {}, setting cacheCompactedBlocksOnWrite as false as total 
size of compacted "
-              + "files - {}, is greater than 
cacheCompactedBlocksOnWriteThreshold - {}",
-            this, totalCompactedFilesSize,
-            cacheConf.getCacheCompactedBlocksOnWriteThreshold());
-        }
-      }
-    } else {
-      final boolean shouldCacheDataOnWrite = 
cacheConf.shouldCacheDataOnWrite();
-      if (shouldCacheDataOnWrite) {
-        writerCacheConf.enableCacheOnWrite();
-        if (!cacheOnWriteLogged) {
-          LOG.info("For {} , cacheDataOnWrite is true, hence enabled 
cacheOnWrite for " +
-            "Index blocks and Bloom filter blocks", this);
-          cacheOnWriteLogged = true;
-        }
-      }
-    }
-    Encryption.Context encryptionContext = storeContext.getEncryptionContext();
-    HFileContext hFileContext = createFileContext(compression, 
includeMVCCReadpoint, includesTag,
-      encryptionContext);
-    Path familyTempDir = new Path(getRegionFileSystem().getTempDir(), 
getColumnFamilyName());
-    StoreFileWriter.Builder builder =
-      new StoreFileWriter.Builder(conf, writerCacheConf, getFileSystem())
-        .withOutputDir(familyTempDir)
-        .withBloomType(storeContext.getBloomFilterType())
-        .withMaxKeyCount(maxKeyCount)
-        .withFavoredNodes(storeContext.getFavoredNodes())
-        .withFileContext(hFileContext)
-        .withShouldDropCacheBehind(shouldDropBehind)
-        .withCompactedFilesSupplier(storeContext.getCompactedFilesSupplier())
-        .withFileStoragePolicy(fileStoragePolicy);
-    return builder.build();
-  }
-
-  private HFileContext createFileContext(Compression.Algorithm compression,
-    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context 
encryptionContext) {
-    if (compression == null) {
-      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
-    }
-    ColumnFamilyDescriptor family = getColumnFamilyDescriptor();
-    HFileContext hFileContext = new HFileContextBuilder()
-      .withIncludesMvcc(includeMVCCReadpoint)
-      .withIncludesTags(includesTag)
-      .withCompression(compression)
-      .withCompressTags(family.isCompressTags())
-      .withChecksumType(StoreUtils.getChecksumType(conf))
-      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
-      .withBlockSize(family.getBlocksize())
-      .withHBaseCheckSum(true)
-      .withDataBlockEncoding(family.getDataBlockEncoding())
-      .withEncryptionContext(encryptionContext)
-      .withCreateTime(EnvironmentEdgeManager.currentTime())
-      .withColumnFamily(getColumnFamilyDescriptor().getName())
-      .withTableName(getTableName().getName())
-      .withCellComparator(getComparator())
-      .build();
-    return hFileContext;
-  }
-
   private long getTotalSize(Collection<HStoreFile> sfs) {
     return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum();
   }
 
-  /**
-   * Change storeFiles adding into place the Reader produced by this new flush.
-   * @param sfs Store files
-   * @return Whether compaction is required.
-   */
-  private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) 
throws IOException {
-    this.lock.writeLock().lock();
-    try {
-      this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
-      /**
-       * NOTE:we should keep clearSnapshot method inside the write lock 
because clearSnapshot may
-       * close {@link DefaultMemStore#snapshot}, which may be used by
-       * {@link DefaultMemStore#getScanners}.
-       */
-      if (snapshotId > 0) {
-        this.memstore.clearSnapshot(snapshotId);
-      }
-    } finally {
-      // We need the lock, as long as we are updating the storeFiles
-      // or changing the memstore. Let us release it before calling
-      // notifyChangeReadersObservers. See HBASE-4485 for a possible
-      // deadlock scenario that could have happened if continue to hold
-      // the lock.
-      this.lock.writeLock().unlock();
-    }
-
+  private boolean completeFlush(List<HStoreFile> sfs, long snapshotId) throws 
IOException {
+    // NOTE:we should keep clearSnapshot method inside the write lock because 
clearSnapshot may
+    // close {@link DefaultMemStore#snapshot}, which may be used by
+    // {@link DefaultMemStore#getScanners}.
+    storeEngine.addStoreFiles(sfs,
+      snapshotId > 0 ? () -> this.memstore.clearSnapshot(snapshotId) : () -> {
+      });
     // notify to be called here - only in case of flushes
     notifyChangedReadersObservers(sfs);
     if (LOG.isTraceEnabled()) {
       long totalSize = getTotalSize(sfs);
-      String traceMessage = "FLUSH time,count,size,store size,store files ["
-          + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + 
totalSize
-          + "," + storeSize + "," + 
storeEngine.getStoreFileManager().getStorefileCount() + "]";
+      String traceMessage = "FLUSH time,count,size,store size,store files [" +
+        EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + 
totalSize + "," +
+        storeSize + "," + 
storeEngine.getStoreFileManager().getStorefileCount() + "]";
       LOG.trace(traceMessage);
     }
     return needsCompaction();
@@ -1261,11 +900,11 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
   private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws 
IOException {
     for (ChangedReadersObserver o : this.changedReaderObservers) {
       List<KeyValueScanner> memStoreScanners;
-      this.lock.readLock().lock();
+      this.storeEngine.readLock();
       try {
         memStoreScanners = this.memstore.getScanners(o.getReadPoint());
       } finally {
-        this.lock.readLock().unlock();
+        this.storeEngine.readUnlock();
       }
       o.updateReaders(sfs, memStoreScanners);
     }
@@ -1307,13 +946,13 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       byte[] stopRow, boolean includeStopRow, long readPt) throws IOException {
     Collection<HStoreFile> storeFilesToScan;
     List<KeyValueScanner> memStoreScanners;
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       storeFilesToScan = 
this.storeEngine.getStoreFileManager().getFilesForScan(startRow,
         includeStartRow, stopRow, includeStopRow);
       memStoreScanners = this.memstore.getScanners(readPt);
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
 
     try {
@@ -1390,11 +1029,11 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       boolean includeMemstoreScanner) throws IOException {
     List<KeyValueScanner> memStoreScanners = null;
     if (includeMemstoreScanner) {
-      this.lock.readLock().lock();
+      this.storeEngine.readLock();
       try {
         memStoreScanners = this.memstore.getScanners(readPt);
       } finally {
-        this.lock.readLock().unlock();
+        this.storeEngine.readUnlock();
       }
     }
     try {
@@ -1510,14 +1149,13 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       List<Path> newFiles) throws IOException {
     // Do the steps necessary to complete the compaction.
     setStoragePolicyFromFileName(newFiles);
-    List<HStoreFile> sfs = commitStoreFiles(newFiles, true);
+    List<HStoreFile> sfs = storeEngine.commitStoreFiles(newFiles, true);
     if (this.getCoprocessorHost() != null) {
       for (HStoreFile sf : sfs) {
         getCoprocessorHost().postCompact(this, sf, cr.getTracker(), cr, user);
       }
     }
-    writeCompactionWalRecord(filesToCompact, sfs);
-    replaceStoreFiles(filesToCompact, sfs);
+    replaceStoreFiles(filesToCompact, sfs, true);
     if (cr.isMajor()) {
       
majorCompactedCellsCount.addAndGet(getCompactionProgress().getTotalCompactingKVs());
       
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
@@ -1581,25 +1219,24 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
         this.region.getRegionInfo(), compactionDescriptor, 
this.region.getMVCC());
   }
 
-  void replaceStoreFiles(Collection<HStoreFile> compactedFiles, 
Collection<HStoreFile> result)
-      throws IOException {
-    this.lock.writeLock().lock();
-    try {
-      
this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, 
result);
-      synchronized (filesCompacting) {
-        filesCompacting.removeAll(compactedFiles);
-      }
-
-      // These may be null when the RS is shutting down. The space quota 
Chores will fix the Region
-      // sizes later so it's not super-critical if we miss these.
-      RegionServerServices rsServices = region.getRegionServerServices();
-      if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() 
!= null) {
-        updateSpaceQuotaAfterFileReplacement(
-            
rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), 
getRegionInfo(),
-            compactedFiles, result);
-      }
-    } finally {
-      this.lock.writeLock().unlock();
+  @RestrictedApi(explanation = "Should only be called in TestHStore", link = 
"",
+    allowedOnPath = ".*/(HStore|TestHStore).java")
+  void replaceStoreFiles(Collection<HStoreFile> compactedFiles, 
Collection<HStoreFile> result,
+    boolean writeCompactionMarker) throws IOException {
+    storeEngine.replaceStoreFiles(compactedFiles, result);
+    if (writeCompactionMarker) {
+      writeCompactionWalRecord(compactedFiles, result);
+    }
+    synchronized (filesCompacting) {
+      filesCompacting.removeAll(compactedFiles);
+    }
+    // These may be null when the RS is shutting down. The space quota Chores 
will fix the Region
+    // sizes later so it's not super-critical if we miss these.
+    RegionServerServices rsServices = region.getRegionServerServices();
+    if (rsServices != null && rsServices.getRegionServerSpaceQuotaManager() != 
null) {
+      updateSpaceQuotaAfterFileReplacement(
+        rsServices.getRegionServerSpaceQuotaManager().getRegionSizeStore(), 
getRegionInfo(),
+        compactedFiles, result);
     }
   }
 
@@ -1722,7 +1359,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       for (String compactionOutput : compactionOutputs) {
         StoreFileInfo storeFileInfo =
             getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), 
compactionOutput);
-        HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
+        HStoreFile storeFile = 
storeEngine.createStoreFileAndReader(storeFileInfo);
         outputStoreFiles.add(storeFile);
       }
     }
@@ -1730,7 +1367,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     if (!inputStoreFiles.isEmpty() || !outputStoreFiles.isEmpty()) {
       LOG.info("Replaying compaction marker, replacing input files: " +
           inputStoreFiles + " with output files : " + outputStoreFiles);
-      this.replaceStoreFiles(inputStoreFiles, outputStoreFiles);
+      this.replaceStoreFiles(inputStoreFiles, outputStoreFiles, false);
       this.refreshStoreSizeAndTotalBytes();
     }
   }
@@ -1739,14 +1376,14 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
   public boolean hasReferences() {
     // Grab the read lock here, because we need to ensure that: only when the 
atomic
     // replaceStoreFiles(..) finished, we can get all the complete store file 
list.
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       // Merge the current store files with compacted files here due to 
HBASE-20940.
       Collection<HStoreFile> allStoreFiles = new ArrayList<>(getStorefiles());
       allStoreFiles.addAll(getCompactedFiles());
       return StoreUtils.hasReferences(allStoreFiles);
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
   }
 
@@ -1786,7 +1423,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
 
     final CompactionContext compaction = storeEngine.createCompaction();
     CompactionRequestImpl request = null;
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       synchronized (filesCompacting) {
         // First, see if coprocessor would want to override selection.
@@ -1859,7 +1496,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
         request.setTracker(tracker);
       }
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
 
     if (LOG.isDebugEnabled()) {
@@ -1892,7 +1529,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
           this, getColumnFamilyDescriptor().getMinVersions());
       return;
     }
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     Collection<HStoreFile> delSfs = null;
     try {
       synchronized (filesCompacting) {
@@ -1904,7 +1541,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
         }
       }
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
 
     if (CollectionUtils.isEmpty(delSfs)) {
@@ -1912,8 +1549,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     }
 
     Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
-    writeCompactionWalRecord(delSfs, newFiles);
-    replaceStoreFiles(delSfs, newFiles);
+    replaceStoreFiles(delSfs, newFiles, true);
     refreshStoreSizeAndTotalBytes();
     LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) 
file(s) in "
         + this + "; total size is "
@@ -1936,25 +1572,6 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
   }
 
   /**
-   * Validates a store file by opening and closing it. In HFileV2 this should 
not be an expensive
-   * operation.
-   * @param path the path to the store file
-   */
-  private void validateStoreFile(Path path) throws IOException {
-    HStoreFile storeFile = null;
-    try {
-      storeFile = createStoreFileAndReader(path);
-    } catch (IOException e) {
-      LOG.error("Failed to open store file : {}, keeping it in tmp location", 
path, e);
-      throw e;
-    } finally {
-      if (storeFile != null) {
-        storeFile.closeStoreFile(false);
-      }
-    }
-  }
-
-  /**
    * Update counts.
    */
   protected void refreshStoreSizeAndTotalBytes()
@@ -1999,7 +1616,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
    * Determines if Store should be split.
    */
   public Optional<byte[]> getSplitPoint() {
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       // Should already be enforced by the split policy!
       assert !this.getRegionInfo().isMetaRegion();
@@ -2012,7 +1629,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     } catch(IOException e) {
       LOG.warn("Failed getting store size for {}", this, e);
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
     return Optional.empty();
   }
@@ -2045,7 +1662,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
    */
   public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> 
targetCols, long readPt)
       throws IOException {
-    lock.readLock().lock();
+    storeEngine.readLock();
     try {
       ScanInfo scanInfo;
       if (this.getCoprocessorHost() != null) {
@@ -2055,7 +1672,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       }
       return createScanner(scan, scanInfo, targetCols, readPt);
     } finally {
-      lock.readLock().unlock();
+      storeEngine.readUnlock();
     }
   }
 
@@ -2085,7 +1702,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       boolean cacheBlocks, boolean usePread, boolean isCompaction, 
ScanQueryMatcher matcher,
       byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean 
includeStopRow, long readPt,
       boolean includeMemstoreScanner) throws IOException {
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       Map<String, HStoreFile> name2File =
           new HashMap<>(getStorefilesCount() + getCompactedFilesCount());
@@ -2110,7 +1727,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       return getScanners(filesToReopen, cacheBlocks, false, false, matcher, 
startRow,
         includeStartRow, stopRow, includeStopRow, readPt, false);
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
   }
 
@@ -2176,41 +1793,20 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
   @Override
   public long getStorefilesSize() {
     // Include all StoreFiles
-    return 
getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(), sf -> 
true);
+    return 
StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
+      sf -> true);
   }
 
   @Override
   public long getHFilesSize() {
     // Include only StoreFiles which are HFiles
-    return 
getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
+    return 
StoreUtils.getStorefilesSize(this.storeEngine.getStoreFileManager().getStorefiles(),
       HStoreFile::isHFile);
   }
 
-  private long getTotalUncompressedBytes(List<HStoreFile> files) {
-    return files.stream()
-      .mapToLong(file -> getStorefileFieldSize(file, 
StoreFileReader::getTotalUncompressedBytes))
-      .sum();
-  }
-
-  private long getStorefilesSize(Collection<HStoreFile> files, 
Predicate<HStoreFile> predicate) {
-    return files.stream().filter(predicate)
-      .mapToLong(file -> getStorefileFieldSize(file, 
StoreFileReader::length)).sum();
-  }
-
-  private long getStorefileFieldSize(HStoreFile file, 
ToLongFunction<StoreFileReader> f) {
-    if (file == null) {
-      return 0L;
-    }
-    StoreFileReader reader = file.getReader();
-    if (reader == null) {
-      return 0L;
-    }
-    return f.applyAsLong(reader);
-  }
-
   private long getStorefilesFieldSize(ToLongFunction<StoreFileReader> f) {
     return this.storeEngine.getStoreFileManager().getStorefiles().stream()
-      .mapToLong(file -> getStorefileFieldSize(file, f)).sum();
+      .mapToLong(file -> StoreUtils.getStorefileFieldSize(file, f)).sum();
   }
 
   @Override
@@ -2281,11 +1877,11 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
    */
   public void upsert(Iterable<Cell> cells, long readpoint, MemStoreSizing 
memstoreSizing)
       throws IOException {
-    this.lock.readLock().lock();
+    this.storeEngine.readLock();
     try {
       this.memstore.upsert(cells, readpoint, memstoreSizing);
     } finally {
-      this.lock.readLock().unlock();
+      this.storeEngine.readUnlock();
     }
   }
 
@@ -2338,7 +1934,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
         return false;
       }
       status.setStatus("Flushing " + this + ": reopening flushed file");
-      List<HStoreFile> storeFiles = commitStoreFiles(tempFiles, false);
+      List<HStoreFile> storeFiles = storeEngine.commitStoreFiles(tempFiles, 
false);
       for (HStoreFile sf : storeFiles) {
         StoreFileReader r = sf.getReader();
         if (LOG.isInfoEnabled()) {
@@ -2361,7 +1957,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
         }
       }
       // Add new file to store files. Clear snapshot too while we have the 
Store write lock.
-      return updateStorefiles(storeFiles, snapshot.getId());
+      return completeFlush(storeFiles, snapshot.getId());
     }
 
     @Override
@@ -2389,7 +1985,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
         // open the file as a store file (hfile link, etc)
         StoreFileInfo storeFileInfo =
           getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file);
-        HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
+        HStoreFile storeFile = 
storeEngine.createStoreFileAndReader(storeFileInfo);
         storeFiles.add(storeFile);
         HStore.this.storeSize.addAndGet(storeFile.getReader().length());
         HStore.this.totalUncompressedBytes
@@ -2405,7 +2001,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
       if (dropMemstoreSnapshot && snapshot != null) {
         snapshotId = snapshot.getId();
       }
-      HStore.this.updateStorefiles(storeFiles, snapshotId);
+      HStore.this.completeFlush(storeFiles, snapshotId);
     }
 
     /**
@@ -2414,7 +2010,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     @Override
     public void abort() throws IOException {
       if (snapshot != null) {
-        HStore.this.updateStorefiles(Collections.emptyList(), 
snapshot.getId());
+        HStore.this.completeFlush(Collections.emptyList(), snapshot.getId());
       }
     }
   }
@@ -2577,7 +2173,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     // ensure other threads do not attempt to archive the same files on close()
     archiveLock.lock();
     try {
-      lock.readLock().lock();
+      storeEngine.readLock();
       Collection<HStoreFile> copyCompactedfiles = null;
       try {
         Collection<HStoreFile> compactedfiles =
@@ -2589,7 +2185,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
           LOG.trace("No compacted files to archive");
         }
       } finally {
-        lock.readLock().unlock();
+        storeEngine.readUnlock();
       }
       if (CollectionUtils.isNotEmpty(copyCompactedfiles)) {
         removeCompactedfiles(copyCompactedfiles, true);
@@ -2724,12 +2320,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
 
   private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws 
IOException {
     LOG.trace("Clearing the compacted file {} from this store", filesToRemove);
-    try {
-      lock.writeLock().lock();
-      
this.getStoreEngine().getStoreFileManager().removeCompactedFiles(filesToRemove);
-    } finally {
-      lock.writeLock().unlock();
-    }
+    storeEngine.removeCompactedFiles(filesToRemove);
   }
 
   void reportArchivedFilesForQuota(List<? extends StoreFile> archivedFiles, 
List<Long> fileSizes) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
index 2623350..2a9f968 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreContext.java
@@ -23,6 +23,7 @@ import java.util.function.Supplier;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -108,6 +109,14 @@ public final class StoreContext implements HeapSize {
     return coprocessorHost;
   }
 
+  public RegionInfo getRegionInfo() {
+    return regionFileSystem.getRegionInfo();
+  }
+
+  public boolean isPrimaryReplicaStore() {
+    return getRegionInfo().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID;
+  }
+
   public static Builder getBuilder() {
     return new Builder();
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
index 60b3c3d..4033c33 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
@@ -19,38 +19,131 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-
+import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
+import 
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import 
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 /**
- * StoreEngine is a factory that can create the objects necessary for HStore 
to operate.
- * Since not all compaction policies, compactors and store file managers are 
compatible,
- * they are tied together and replaced together via StoreEngine-s.
+ * StoreEngine is a factory that can create the objects necessary for HStore 
to operate. Since not
+ * all compaction policies, compactors and store file managers are compatible, 
they are tied
+ * together and replaced together via StoreEngine-s.
+ * <p/>
+ * We expose read write lock methods to upper layer for store operations:<br/>
+ * <ul>
+ * <li>Locked in shared mode when the list of component stores is looked at:
+ * <ul>
+ * <li>all reads/writes to table data</li>
+ * <li>checking for split</li>
+ * </ul>
+ * </li>
+ * <li>Locked in exclusive mode when the list of component stores is modified:
+ * <ul>
+ * <li>closing</li>
+ * <li>completing a compaction</li>
+ * </ul>
+ * </li>
+ * </ul>
+ * <p/>
+ * It is a bit confusing that we have a StoreFileManager(SFM) and then a 
StoreFileTracker(SFT). As
+ * its name says, SFT is used to track the store files list. The reason why we 
have a SFT beside SFM
+ * is that, when introducing stripe compaction, we introduced the StoreEngine 
and also the SFM, but
+ * actually, the SFM here is not a general 'Manager', it is only designed to 
manage the in memory
+ * 'stripes', so we can select different store files when scanning or 
compacting. The 'tracking' of
+ * store files is actually done in {@link 
org.apache.hadoop.hbase.regionserver.HRegionFileSystem}
+ * and {@link HStore} before we have SFT. And since SFM is designed to only 
holds in memory states,
+ * we will hold write lock when updating it, the lock is also used to protect 
the normal read/write
+ * requests. This means we'd better not add IO operations to SFM. And also, no 
matter what the in
+ * memory state is, stripe or not, it does not effect how we track the store 
files. So consider all
+ * these facts, here we introduce a separated SFT to track the store files.
+ * <p/>
+ * Here, since we always need to update SFM and SFT almost at the same time, 
we introduce methods in
+ * StoreEngine directly to update them both, so upper layer just need to 
update StoreEngine once, to
+ * reduce the possible misuse.
  */
 @InterfaceAudience.Private
-public abstract class StoreEngine<SF extends StoreFlusher,
-    CP extends CompactionPolicy, C extends Compactor, SFM extends 
StoreFileManager> {
+public abstract class StoreEngine<SF extends StoreFlusher, CP extends 
CompactionPolicy,
+  C extends Compactor, SFM extends StoreFileManager> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StoreEngine.class);
+
   protected SF storeFlusher;
   protected CP compactionPolicy;
   protected C compactor;
   protected SFM storeFileManager;
+  private Configuration conf;
+  private StoreContext ctx;
+  private RegionCoprocessorHost coprocessorHost;
+  private Function<String, ExecutorService> openStoreFileThreadPoolCreator;
+  private StoreFileTracker storeFileTracker;
+
+  private final ReadWriteLock storeLock = new ReentrantReadWriteLock();
 
   /**
-   * The name of the configuration parameter that specifies the class of
-   * a store engine that is used to manage and compact HBase store files.
+   * The name of the configuration parameter that specifies the class of a 
store engine that is used
+   * to manage and compact HBase store files.
    */
   public static final String STORE_ENGINE_CLASS_KEY = 
"hbase.hstore.engine.class";
 
-  private static final Class<? extends StoreEngine<?, ?, ?, ?>>
-    DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
+  private static final Class<? extends StoreEngine<?, ?, ?, ?>> 
DEFAULT_STORE_ENGINE_CLASS =
+    DefaultStoreEngine.class;
+
+  /**
+   * Acquire read lock of this store.
+   */
+  public void readLock() {
+    storeLock.readLock().lock();
+  }
+
+  /**
+   * Release read lock of this store.
+   */
+  public void readUnlock() {
+    storeLock.readLock().unlock();
+  }
+
+  /**
+   * Acquire write lock of this store.
+   */
+  public void writeLock() {
+    storeLock.writeLock().lock();
+  }
+
+  /**
+   * Release write lock of this store.
+   */
+  public void writeUnlock() {
+    storeLock.writeLock().unlock();
+  }
 
   /**
    * @return Compaction policy to use.
@@ -80,6 +173,11 @@ public abstract class StoreEngine<SF extends StoreFlusher,
     return this.storeFlusher;
   }
 
+  private StoreFileTracker createStoreFileTracker(HStore store) {
+    return StoreFileTrackerFactory.create(store.conf, 
store.getRegionInfo().getTable(),
+      store.isPrimaryReplicaStore(), store.getStoreContext());
+  }
+
   /**
    * @param filesCompacting Files currently compacting
    * @return whether a compaction selection is possible
@@ -87,8 +185,8 @@ public abstract class StoreEngine<SF extends StoreFlusher,
   public abstract boolean needsCompaction(List<HStoreFile> filesCompacting);
 
   /**
-   * Creates an instance of a compaction context specific to this engine.
-   * Doesn't actually select or start a compaction. See CompactionContext 
class comment.
+   * Creates an instance of a compaction context specific to this engine. 
Doesn't actually select or
+   * start a compaction. See CompactionContext class comment.
    * @return New CompactionContext object.
    */
   public abstract CompactionContext createCompaction() throws IOException;
@@ -96,36 +194,347 @@ public abstract class StoreEngine<SF extends StoreFlusher,
   /**
    * Create the StoreEngine's components.
    */
-  protected abstract void createComponents(
-      Configuration conf, HStore store, CellComparator cellComparator) throws 
IOException;
+  protected abstract void createComponents(Configuration conf, HStore store,
+    CellComparator cellComparator) throws IOException;
 
-  private void createComponentsOnce(
-      Configuration conf, HStore store, CellComparator cellComparator) throws 
IOException {
-    assert compactor == null && compactionPolicy == null
-        && storeFileManager == null && storeFlusher == null;
+  protected final void createComponentsOnce(Configuration conf, HStore store,
+    CellComparator cellComparator) throws IOException {
+    assert compactor == null && compactionPolicy == null && storeFileManager 
== null &&
+      storeFlusher == null && storeFileTracker == null;
     createComponents(conf, store, cellComparator);
-    assert compactor != null && compactionPolicy != null
-        && storeFileManager != null && storeFlusher != null;
+    this.conf = conf;
+    this.ctx = store.getStoreContext();
+    this.coprocessorHost = store.getHRegion().getCoprocessorHost();
+    this.openStoreFileThreadPoolCreator = 
store.getHRegion()::getStoreFileOpenAndCloseThreadPool;
+    this.storeFileTracker = createStoreFileTracker(store);
+    assert compactor != null && compactionPolicy != null && storeFileManager 
!= null &&
+      storeFlusher != null && storeFileTracker != null;
+  }
+
+  /**
+   * Create a writer for writing new store files.
+   * @return Writer for a new StoreFile
+   */
+  public StoreFileWriter createWriter(CreateStoreFileWriterParams params) 
throws IOException {
+    return storeFileTracker.createWriter(params);
+  }
+
+  public HStoreFile createStoreFileAndReader(Path p) throws IOException {
+    StoreFileInfo info = new StoreFileInfo(conf, 
ctx.getRegionFileSystem().getFileSystem(), p,
+      ctx.isPrimaryReplicaStore());
+    return createStoreFileAndReader(info);
+  }
+
+  public HStoreFile createStoreFileAndReader(StoreFileInfo info) throws 
IOException {
+    info.setRegionCoprocessorHost(coprocessorHost);
+    HStoreFile storeFile =
+      new HStoreFile(info, ctx.getFamily().getBloomFilterType(), 
ctx.getCacheConf());
+    storeFile.initReader();
+    return storeFile;
+  }
+
+  /**
+   * Validates a store file by opening and closing it. In HFileV2 this should 
not be an expensive
+   * operation.
+   * @param path the path to the store file
+   */
+  public void validateStoreFile(Path path) throws IOException {
+    HStoreFile storeFile = null;
+    try {
+      storeFile = createStoreFileAndReader(path);
+    } catch (IOException e) {
+      LOG.error("Failed to open store file : {}, keeping it in tmp location", 
path, e);
+      throw e;
+    } finally {
+      if (storeFile != null) {
+        storeFile.closeStoreFile(false);
+      }
+    }
+  }
+
+  private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files, 
boolean warmup)
+    throws IOException {
+    if (CollectionUtils.isEmpty(files)) {
+      return Collections.emptyList();
+    }
+    // initialize the thread pool for opening store files in parallel..
+    ExecutorService storeFileOpenerThreadPool =
+      openStoreFileThreadPoolCreator.apply("StoreFileOpener-" +
+        ctx.getRegionInfo().getEncodedName() + "-" + 
ctx.getFamily().getNameAsString());
+    CompletionService<HStoreFile> completionService =
+      new ExecutorCompletionService<>(storeFileOpenerThreadPool);
+
+    int totalValidStoreFile = 0;
+    for (StoreFileInfo storeFileInfo : files) {
+      // The StoreFileInfo will carry store configuration down to HFile, we 
need to set it to
+      // our store's CompoundConfiguration here.
+      storeFileInfo.setConf(conf);
+      // open each store file in parallel
+      completionService.submit(() -> createStoreFileAndReader(storeFileInfo));
+      totalValidStoreFile++;
+    }
+
+    Set<String> compactedStoreFiles = new HashSet<>();
+    ArrayList<HStoreFile> results = new ArrayList<>(files.size());
+    IOException ioe = null;
+    try {
+      for (int i = 0; i < totalValidStoreFile; i++) {
+        try {
+          HStoreFile storeFile = completionService.take().get();
+          if (storeFile != null) {
+            LOG.debug("loaded {}", storeFile);
+            results.add(storeFile);
+            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
+          }
+        } catch (InterruptedException e) {
+          if (ioe == null) {
+            ioe = new InterruptedIOException(e.getMessage());
+          }
+        } catch (ExecutionException e) {
+          if (ioe == null) {
+            ioe = new IOException(e.getCause());
+          }
+        }
+      }
+    } finally {
+      storeFileOpenerThreadPool.shutdownNow();
+    }
+    if (ioe != null) {
+      // close StoreFile readers
+      boolean evictOnClose =
+        ctx.getCacheConf() != null ? ctx.getCacheConf().shouldEvictOnClose() : 
true;
+      for (HStoreFile file : results) {
+        try {
+          if (file != null) {
+            file.closeStoreFile(evictOnClose);
+          }
+        } catch (IOException e) {
+          LOG.warn("Could not close store file {}", file, e);
+        }
+      }
+      throw ioe;
+    }
+
+    // Should not archive the compacted store files when region warmup. See 
HBASE-22163.
+    if (!warmup) {
+      // Remove the compacted files from result
+      List<HStoreFile> filesToRemove = new 
ArrayList<>(compactedStoreFiles.size());
+      for (HStoreFile storeFile : results) {
+        if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
+          LOG.warn("Clearing the compacted storefile {} from {}", storeFile, 
this);
+          storeFile.getReader().close(
+            storeFile.getCacheConf() != null ? 
storeFile.getCacheConf().shouldEvictOnClose() :
+              true);
+          filesToRemove.add(storeFile);
+        }
+      }
+      results.removeAll(filesToRemove);
+      if (!filesToRemove.isEmpty() && ctx.isPrimaryReplicaStore()) {
+        LOG.debug("Moving the files {} to archive", filesToRemove);
+        
ctx.getRegionFileSystem().removeStoreFiles(ctx.getFamily().getNameAsString(),
+          filesToRemove);
+      }
+    }
+
+    return results;
+  }
+
+  public void initialize(boolean warmup) throws IOException {
+    List<StoreFileInfo> fileInfos = storeFileTracker.load();
+    List<HStoreFile> files = openStoreFiles(fileInfos, warmup);
+    storeFileManager.loadFiles(files);
+  }
+
+  public void refreshStoreFiles() throws IOException {
+    List<StoreFileInfo> fileInfos = storeFileTracker.load();
+    refreshStoreFilesInternal(fileInfos);
+  }
+
+  public void refreshStoreFiles(Collection<String> newFiles) throws 
IOException {
+    List<StoreFileInfo> storeFiles = new ArrayList<>(newFiles.size());
+    for (String file : newFiles) {
+      storeFiles
+        
.add(ctx.getRegionFileSystem().getStoreFileInfo(ctx.getFamily().getNameAsString(),
 file));
+    }
+    refreshStoreFilesInternal(storeFiles);
+  }
+
+  /**
+   * Checks the underlying store files, and opens the files that have not been 
opened, and removes
+   * the store file readers for store files no longer available. Mainly used 
by secondary region
+   * replicas to keep up to date with the primary region files.
+   */
+  private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) 
throws IOException {
+    Collection<HStoreFile> currentFiles = storeFileManager.getStorefiles();
+    Collection<HStoreFile> compactedFiles = 
storeFileManager.getCompactedfiles();
+    if (currentFiles == null) {
+      currentFiles = Collections.emptySet();
+    }
+    if (newFiles == null) {
+      newFiles = Collections.emptySet();
+    }
+    if (compactedFiles == null) {
+      compactedFiles = Collections.emptySet();
+    }
+
+    HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new 
HashMap<>(currentFiles.size());
+    for (HStoreFile sf : currentFiles) {
+      currentFilesSet.put(sf.getFileInfo(), sf);
+    }
+    HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new 
HashMap<>(compactedFiles.size());
+    for (HStoreFile sf : compactedFiles) {
+      compactedFilesSet.put(sf.getFileInfo(), sf);
+    }
+
+    Set<StoreFileInfo> newFilesSet = new HashSet<StoreFileInfo>(newFiles);
+    // Exclude the files that have already been compacted
+    newFilesSet = Sets.difference(newFilesSet, compactedFilesSet.keySet());
+    Set<StoreFileInfo> toBeAddedFiles = Sets.difference(newFilesSet, 
currentFilesSet.keySet());
+    Set<StoreFileInfo> toBeRemovedFiles = 
Sets.difference(currentFilesSet.keySet(), newFilesSet);
+
+    if (toBeAddedFiles.isEmpty() && toBeRemovedFiles.isEmpty()) {
+      return;
+    }
+
+    LOG.info("Refreshing store files for " + this + " files to add: " + 
toBeAddedFiles +
+      " files to remove: " + toBeRemovedFiles);
+
+    Set<HStoreFile> toBeRemovedStoreFiles = new 
HashSet<>(toBeRemovedFiles.size());
+    for (StoreFileInfo sfi : toBeRemovedFiles) {
+      toBeRemovedStoreFiles.add(currentFilesSet.get(sfi));
+    }
+
+    // try to open the files
+    List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles, false);
+
+    // propogate the file changes to the underlying store file manager
+    replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); // won't throw an 
exception
+  }
+
+  /**
+   * Commit the given {@code files}.
+   * <p/>
+   * We will move the file into data directory, and open it.
+   * @param files the files want to commit
+   * @param validate whether to validate the store files
+   * @return the committed store files
+   */
+  public List<HStoreFile> commitStoreFiles(List<Path> files, boolean validate) 
throws IOException {
+    List<HStoreFile> committedFiles = new ArrayList<>(files.size());
+    HRegionFileSystem hfs = ctx.getRegionFileSystem();
+    String familyName = ctx.getFamily().getNameAsString();
+    Path storeDir = hfs.getStoreDir(familyName);
+    for (Path file : files) {
+      try {
+        if (validate) {
+          validateStoreFile(file);
+        }
+        Path committedPath;
+        // As we want to support writing to data directory directly, here we 
need to check whether
+        // the store file is already in the right place
+        if (file.getParent() != null && file.getParent().equals(storeDir)) {
+          // already in the right place, skip renmaing
+          committedPath = file;
+        } else {
+          // Write-out finished successfully, move into the right spot
+          committedPath = hfs.commitStoreFile(familyName, file);
+        }
+        HStoreFile sf = createStoreFileAndReader(committedPath);
+        committedFiles.add(sf);
+      } catch (IOException e) {
+        LOG.error("Failed to commit store file {}", file, e);
+        // Try to delete the files we have committed before.
+        // It is OK to fail when deleting as leaving the file there does not 
cause any data
+        // corruption problem. It just introduces some duplicated data which 
may impact read
+        // performance a little when reading before compaction.
+        for (HStoreFile sf : committedFiles) {
+          Path pathToDelete = sf.getPath();
+          try {
+            sf.deleteStoreFile();
+          } catch (IOException deleteEx) {
+            LOG.warn(HBaseMarkers.FATAL, "Failed to delete committed store 
file {}", pathToDelete,
+              deleteEx);
+          }
+        }
+        throw new IOException("Failed to commit the flush", e);
+      }
+    }
+    return committedFiles;
+  }
+
+  @FunctionalInterface
+  public interface IOExceptionRunnable {
+    void run() throws IOException;
+  }
+
+  /**
+   * Add the store files to store file manager, and also record it in the 
store file tracker.
+   * <p/>
+   * The {@code actionAfterAdding} will be executed after the insertion to 
store file manager, under
+   * the lock protection. Usually this is for clear the memstore snapshot.
+   */
+  public void addStoreFiles(Collection<HStoreFile> storeFiles,
+    IOExceptionRunnable actionAfterAdding) throws IOException {
+    storeFileTracker.add(StoreUtils.toStoreFileInfo(storeFiles));
+    writeLock();
+    try {
+      storeFileManager.insertNewFiles(storeFiles);
+      actionAfterAdding.run();
+    } finally {
+      // We need the lock, as long as we are updating the storeFiles
+      // or changing the memstore. Let us release it before calling
+      // notifyChangeReadersObservers. See HBASE-4485 for a possible
+      // deadlock scenario that could have happened if continue to hold
+      // the lock.
+      writeUnlock();
+    }
+  }
+
+  public void replaceStoreFiles(Collection<HStoreFile> compactedFiles,
+    Collection<HStoreFile> newFiles) throws IOException {
+    storeFileTracker.replace(StoreUtils.toStoreFileInfo(compactedFiles),
+      StoreUtils.toStoreFileInfo(newFiles));
+    writeLock();
+    try {
+      storeFileManager.addCompactionResults(compactedFiles, newFiles);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  public void removeCompactedFiles(Collection<HStoreFile> compactedFiles) {
+    writeLock();
+    try {
+      storeFileManager.removeCompactedFiles(compactedFiles);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Create the StoreEngine configured for the given Store.
-   * @param store The store. An unfortunate dependency needed due to it
-   *              being passed to coprocessors via the compactor.
+   * @param store The store. An unfortunate dependency needed due to it being 
passed to coprocessors
+   *          via the compactor.
    * @param conf Store configuration.
    * @param cellComparator CellComparator for storeFileManager.
    * @return StoreEngine to use.
    */
-  public static StoreEngine<?, ?, ?, ?> create(
-      HStore store, Configuration conf, CellComparator cellComparator) throws 
IOException {
+  public static StoreEngine<?, ?, ?, ?> create(HStore store, Configuration 
conf,
+    CellComparator cellComparator) throws IOException {
     String className = conf.get(STORE_ENGINE_CLASS_KEY, 
DEFAULT_STORE_ENGINE_CLASS.getName());
     try {
-      StoreEngine<?,?,?,?> se = ReflectionUtils.instantiateWithCustomCtor(
-          className, new Class[] { }, new Object[] { });
+      StoreEngine<?, ?, ?, ?> se =
+        ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {}, 
new Object[] {});
       se.createComponentsOnce(conf, store, cellComparator);
       return se;
     } catch (Exception e) {
       throw new IOException("Unable to load configured store engine '" + 
className + "'", e);
     }
   }
+
+  @RestrictedApi(explanation = "Should only be called in TestHStore", link = 
"",
+    allowedOnPath = ".*/TestHStore.java")
+  ReadWriteLock getLock() {
+    return storeLock;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
index 27127f3..a40b209 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Comparator;
@@ -49,12 +50,16 @@ public interface StoreFileManager {
    * Loads the initial store files into empty StoreFileManager.
    * @param storeFiles The files to load.
    */
+  @RestrictedApi(explanation = "Should only be called in StoreEngine", link = 
"",
+    allowedOnPath = 
".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
   void loadFiles(List<HStoreFile> storeFiles);
 
   /**
    * Adds new files, either for from MemStore flush or bulk insert, into the 
structure.
    * @param sfs New store files.
    */
+  @RestrictedApi(explanation = "Should only be called in StoreEngine", link = 
"",
+    allowedOnPath = 
".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
   void insertNewFiles(Collection<HStoreFile> sfs);
 
   /**
@@ -62,12 +67,16 @@ public interface StoreFileManager {
    * @param compactedFiles The input files for the compaction.
    * @param results The resulting files for the compaction.
    */
+  @RestrictedApi(explanation = "Should only be called in StoreEngine", link = 
"",
+    allowedOnPath = 
".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
   void addCompactionResults(Collection<HStoreFile> compactedFiles, 
Collection<HStoreFile> results);
 
   /**
    * Remove the compacted files
    * @param compactedFiles the list of compacted files
    */
+  @RestrictedApi(explanation = "Should only be called in StoreEngine", link = 
"",
+    allowedOnPath = 
".*(/org/apache/hadoop/hbase/regionserver/StoreEngine.java|/src/test/.*)")
   void removeCompactedFiles(Collection<HStoreFile> compactedFiles);
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 1064b6c..5803128 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -70,10 +70,17 @@ abstract class StoreFlusher {
     writer.close();
   }
 
+  protected final StoreFileWriter createWriter(MemStoreSnapshot snapshot, 
boolean alwaysIncludesTag)
+    throws IOException {
+    return store.getStoreEngine()
+      
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(snapshot.getCellsCount())
+        
.compression(store.getColumnFamilyDescriptor().getCompressionType()).isCompaction(false)
+        .includeMVCCReadpoint(true).includesTag(alwaysIncludesTag || 
snapshot.isTagsPresent())
+        .shouldDropBehind(false));
+  }
 
   /**
    * Creates the scanner for flushing snapshot. Also calls coprocessors.
-   * @param snapshotScanners
    * @return The scanner; null if coprocessor is canceling the flush.
    */
   protected final InternalScanner createScanner(List<KeyValueScanner> 
snapshotScanners,
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
index 454b244..10a9330 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
@@ -20,10 +20,13 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
-
+import java.util.function.Predicate;
+import java.util.function.ToLongFunction;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
@@ -42,10 +45,13 @@ import org.slf4j.LoggerFactory;
  * Utility functions for region server storage layer.
  */
 @InterfaceAudience.Private
-public class StoreUtils {
+public final class StoreUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(StoreUtils.class);
 
+  private StoreUtils() {
+  }
+
   /**
    * Creates a deterministic hash code for store file collection.
    */
@@ -171,4 +177,31 @@ public class StoreUtils {
     return new CompoundConfiguration().add(conf).addBytesMap(td.getValues())
         .addStringMap(cfd.getConfiguration()).addBytesMap(cfd.getValues());
   }
+
+  public static List<StoreFileInfo> toStoreFileInfo(Collection<HStoreFile> 
storefiles) {
+    return 
storefiles.stream().map(HStoreFile::getFileInfo).collect(Collectors.toList());
+  }
+
+  public static long getTotalUncompressedBytes(List<HStoreFile> files) {
+    return files.stream()
+      .mapToLong(file -> getStorefileFieldSize(file, 
StoreFileReader::getTotalUncompressedBytes))
+      .sum();
+  }
+
+  public static long getStorefilesSize(Collection<HStoreFile> files,
+    Predicate<HStoreFile> predicate) {
+    return files.stream().filter(predicate)
+      .mapToLong(file -> getStorefileFieldSize(file, 
StoreFileReader::length)).sum();
+  }
+
+  public static long getStorefileFieldSize(HStoreFile file, 
ToLongFunction<StoreFileReader> f) {
+    if (file == null) {
+      return 0L;
+    }
+    StoreFileReader reader = file.getReader();
+    if (reader == null) {
+      return 0L;
+    }
+    return f.applyAsLong(reader);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
index 14863a6..bfb3f64 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
@@ -20,20 +20,19 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index 1560aef..f8183b7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -70,7 +70,7 @@ public class StripeStoreFlusher extends StoreFlusher {
     StripeMultiFileWriter mw = null;
     try {
       mw = req.createWriter(); // Writer according to the policy.
-      StripeMultiFileWriter.WriterFactory factory = 
createWriterFactory(cellsCount);
+      StripeMultiFileWriter.WriterFactory factory = 
createWriterFactory(snapshot);
       StoreScanner storeScanner = (scanner instanceof StoreScanner) ? 
(StoreScanner)scanner : null;
       mw.init(storeScanner, factory);
 
@@ -98,13 +98,12 @@ public class StripeStoreFlusher extends StoreFlusher {
     return result;
   }
 
-  private StripeMultiFileWriter.WriterFactory createWriterFactory(final long 
kvCount) {
+  private StripeMultiFileWriter.WriterFactory 
createWriterFactory(MemStoreSnapshot snapshot) {
     return new StripeMultiFileWriter.WriterFactory() {
       @Override
       public StoreFileWriter createWriter() throws IOException {
-        StoreFileWriter writer = store.createWriterInTmp(kvCount,
-            store.getColumnFamilyDescriptor().getCompressionType(), false, 
true, true, false);
-        return writer;
+        // XXX: it used to always pass true for includesTag, re-consider?
+        return StripeStoreFlusher.this.createWriter(snapshot, true);
       }
     };
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
index 42841bf..533be17 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java
@@ -51,13 +51,14 @@ public abstract class AbstractMultiOutputCompactor<T 
extends AbstractMultiFileWr
     WriterFactory writerFactory = new WriterFactory() {
       @Override
       public StoreFileWriter createWriter() throws IOException {
-        return createTmpWriter(fd, shouldDropBehind, major);
+        return AbstractMultiOutputCompactor.this.createWriter(fd, 
shouldDropBehind, major);
       }
 
       @Override
       public StoreFileWriter createWriterWithStoragePolicy(String 
fileStoragePolicy)
-          throws IOException {
-        return createTmpWriter(fd, shouldDropBehind, fileStoragePolicy, major);
+        throws IOException {
+        return AbstractMultiOutputCompactor.this.createWriter(fd, 
shouldDropBehind,
+          fileStoragePolicy, major);
       }
     };
     // Prepare multi-writer, and perform the compaction using scanner and 
writer.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index e524f7d..47ef0f2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.regionserver.CellSink;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -61,6 +61,7 @@ import 
org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 
 /**
@@ -261,29 +262,32 @@ public abstract class Compactor<T extends CellSink> {
     }
   };
 
+  protected final CreateStoreFileWriterParams createParams(FileDetails fd, 
boolean shouldDropBehind,
+    boolean major) {
+    return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
+      .compression(major ? majorCompactionCompression : 
minorCompactionCompression)
+      .isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
+      .includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
+      .totalCompactedFilesSize(fd.totalCompactedFilesSize);
+  }
+
   /**
-   * Creates a writer for a new file in a temporary directory.
+   * Creates a writer for a new file.
    * @param fd The file details.
-   * @return Writer for a new StoreFile in the tmp dir.
+   * @return Writer for a new StoreFile
    * @throws IOException if creation failed
    */
-  protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean 
shouldDropBehind, boolean major)
-      throws IOException {
+  protected final StoreFileWriter createWriter(FileDetails fd, boolean 
shouldDropBehind,
+    boolean major) throws IOException {
     // When all MVCC readpoints are 0, don't write them.
     // See HBASE-8166, HBASE-12600, and HBASE-13389.
-    return store.createWriterInTmp(fd.maxKeyCount,
-        major ? majorCompactionCompression : minorCompactionCompression,
-        true, fd.maxMVCCReadpoint > 0,
-        fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize,
-        HConstants.EMPTY_STRING);
+    return store.getStoreEngine().createWriter(createParams(fd, 
shouldDropBehind, major));
   }
 
-  protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean 
shouldDropBehind,
-      String fileStoragePolicy, boolean major) throws IOException {
-    return store.createWriterInTmp(fd.maxKeyCount,
-      major ? majorCompactionCompression : minorCompactionCompression,
-      true, fd.maxMVCCReadpoint > 0,
-      fd.maxTagsLength > 0, shouldDropBehind, fd.totalCompactedFilesSize, 
fileStoragePolicy);
+  protected final StoreFileWriter createWriter(FileDetails fd, boolean 
shouldDropBehind,
+    String fileStoragePolicy, boolean major) throws IOException {
+    return store.getStoreEngine()
+      .createWriter(createParams(fd, shouldDropBehind, 
major).fileStoragePolicy(fileStoragePolicy));
   }
 
   private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, 
ScanType scanType,
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 49d3e8e..afa2429 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -45,14 +45,14 @@ public class DefaultCompactor extends 
Compactor<StoreFileWriter> {
   }
 
   private final CellSinkFactory<StoreFileWriter> writerFactory =
-      new CellSinkFactory<StoreFileWriter>() {
-        @Override
-        public StoreFileWriter createWriter(InternalScanner scanner,
-            
org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd,
-            boolean shouldDropBehind, boolean major) throws IOException {
-          return createTmpWriter(fd, shouldDropBehind, major);
-        }
-      };
+    new CellSinkFactory<StoreFileWriter>() {
+      @Override
+      public StoreFileWriter createWriter(InternalScanner scanner,
+        org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails 
fd,
+        boolean shouldDropBehind, boolean major) throws IOException {
+        return DefaultCompactor.this.createWriter(fd, shouldDropBehind, major);
+      }
+    };
 
   /**
    * Do a minor/major compaction on an explicit set of storefiles from a Store.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
new file mode 100644
index 0000000..d4c9a86
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/DefaultStoreFileTracker.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The default implementation for store file tracker, where we do not persist 
the store file list,
+ * and use listing when loading store files.
+ */
+@InterfaceAudience.Private
+class DefaultStoreFileTracker extends StoreFileTrackerBase {
+
+  public DefaultStoreFileTracker(Configuration conf, TableName tableName, 
boolean isPrimaryReplica,
+    StoreContext ctx) {
+    super(conf, tableName, isPrimaryReplica, ctx);
+  }
+
+  @Override
+  public List<StoreFileInfo> load() throws IOException {
+    return 
ctx.getRegionFileSystem().getStoreFiles(ctx.getFamily().getNameAsString());
+  }
+
+  @Override
+  public boolean requireWritingToTmpDirFirst() {
+    return true;
+  }
+
+  @Override
+  protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws 
IOException {
+    // NOOP
+  }
+
+  @Override
+  protected void doAddCompactionResults(Collection<StoreFileInfo> 
compactedFiles,
+    Collection<StoreFileInfo> newFiles) throws IOException {
+    // NOOP
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
new file mode 100644
index 0000000..aadedc8
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTracker.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * An interface to define how we track the store files for a give store.
+ * <p/>
+ * In the old time, we will write store to a tmp directory first, and then 
rename it to the actual
+ * data file. And once a store file is under data directory, we will consider 
it as 'committed'. And
+ * we need to do listing when loading store files.
+ * <p/>
+ * When cloud age is coming, now we want to store the store files on object 
storage, where rename
+ * and list are not as cheap as on HDFS, especially rename. Although 
introducing a metadata
+ * management layer for object storage could solve the problem, but we still 
want HBase to run on
+ * pure object storage, so here we introduce this interface to abstract how we 
track the store
+ * files. For the old implementation, we just persist nothing here, and do 
listing to load store
+ * files. When running on object storage, we could persist the store file list 
in a system region,
+ * or in a file on the object storage, to make it possible to write directly 
into the data directory
+ * to avoid renaming, and also avoid listing when loading store files.
+ * <p/>
+ * The implementation requires to be thread safe as flush and compaction may 
occur as the same time,
+ * and we could also do multiple compactions at the same time. As the 
implementation may choose to
+ * persist the store file list to external storage, which could be slow, it is 
the duty for the
+ * callers to not call it inside a lock which may block normal read/write 
requests.
+ */
+@InterfaceAudience.Private
+public interface StoreFileTracker {
+
+  /**
+   * Load the store files list when opening a region.
+   */
+  List<StoreFileInfo> load() throws IOException;
+
+  /**
+   * Add new store files.
+   * <p/>
+   * Used for flush and bulk load.
+   */
+  void add(Collection<StoreFileInfo> newFiles) throws IOException;
+
+  /**
+   * Add new store files and remove compacted store files after compaction.
+   */
+  void replace(Collection<StoreFileInfo> compactedFiles, 
Collection<StoreFileInfo> newFiles)
+    throws IOException;
+
+  /**
+   * Create a writer for writing new store files.
+   * @return Writer for a new StoreFile
+   */
+  StoreFileWriter createWriter(CreateStoreFileWriterParams params) throws 
IOException;
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
new file mode 100644
index 0000000..2451f45
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerBase.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for all store file tracker.
+ * <p/>
+ * Mainly used to place the common logic to skip persistent for secondary 
replicas.
+ */
+@InterfaceAudience.Private
+abstract class StoreFileTrackerBase implements StoreFileTracker {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StoreFileTrackerBase.class);
+
+  protected final Configuration conf;
+
+  protected final TableName tableName;
+
+  protected final boolean isPrimaryReplica;
+
+  protected final StoreContext ctx;
+
+  private volatile boolean cacheOnWriteLogged;
+
+  protected StoreFileTrackerBase(Configuration conf, TableName tableName, 
boolean isPrimaryReplica,
+    StoreContext ctx) {
+    this.conf = conf;
+    this.tableName = tableName;
+    this.isPrimaryReplica = isPrimaryReplica;
+    this.ctx = ctx;
+  }
+
+  @Override
+  public final void add(Collection<StoreFileInfo> newFiles) throws IOException 
{
+    if (isPrimaryReplica) {
+      doAddNewStoreFiles(newFiles);
+    }
+  }
+
+  @Override
+  public final void replace(Collection<StoreFileInfo> compactedFiles,
+    Collection<StoreFileInfo> newFiles) throws IOException {
+    if (isPrimaryReplica) {
+      doAddCompactionResults(compactedFiles, newFiles);
+    }
+  }
+
+  private HFileContext createFileContext(Compression.Algorithm compression,
+    boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context 
encryptionContext) {
+    if (compression == null) {
+      compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
+    }
+    ColumnFamilyDescriptor family = ctx.getFamily();
+    HFileContext hFileContext = new 
HFileContextBuilder().withIncludesMvcc(includeMVCCReadpoint)
+      .withIncludesTags(includesTag).withCompression(compression)
+      
.withCompressTags(family.isCompressTags()).withChecksumType(StoreUtils.getChecksumType(conf))
+      .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf))
+      .withBlockSize(family.getBlocksize()).withHBaseCheckSum(true)
+      
.withDataBlockEncoding(family.getDataBlockEncoding()).withEncryptionContext(encryptionContext)
+      
.withCreateTime(EnvironmentEdgeManager.currentTime()).withColumnFamily(family.getName())
+      
.withTableName(tableName.getName()).withCellComparator(ctx.getComparator()).build();
+    return hFileContext;
+  }
+
+  @Override
+  public final StoreFileWriter createWriter(CreateStoreFileWriterParams params)
+    throws IOException {
+    if (!isPrimaryReplica) {
+      throw new IllegalStateException("Should not call create writer on 
secondary replicas");
+    }
+    // creating new cache config for each new writer
+    final CacheConfig cacheConf = ctx.getCacheConf();
+    final CacheConfig writerCacheConf = new CacheConfig(cacheConf);
+    long totalCompactedFilesSize = params.totalCompactedFilesSize();
+    if (params.isCompaction()) {
+      // Don't cache data on write on compactions, unless specifically 
configured to do so
+      // Cache only when total file size remains lower than configured 
threshold
+      final boolean cacheCompactedBlocksOnWrite = 
cacheConf.shouldCacheCompactedBlocksOnWrite();
+      // if data blocks are to be cached on write
+      // during compaction, we should forcefully
+      // cache index and bloom blocks as well
+      if (cacheCompactedBlocksOnWrite &&
+        totalCompactedFilesSize <= 
cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
+        writerCacheConf.enableCacheOnWrite();
+        if (!cacheOnWriteLogged) {
+          LOG.info("For {} , cacheCompactedBlocksOnWrite is true, hence 
enabled " +
+            "cacheOnWrite for Data blocks, Index blocks and Bloom filter 
blocks", this);
+          cacheOnWriteLogged = true;
+        }
+      } else {
+        writerCacheConf.setCacheDataOnWrite(false);
+        if (totalCompactedFilesSize > 
cacheConf.getCacheCompactedBlocksOnWriteThreshold()) {
+          // checking condition once again for logging
+          LOG.debug(
+            "For {}, setting cacheCompactedBlocksOnWrite as false as total 
size of compacted " +
+              "files - {}, is greater than 
cacheCompactedBlocksOnWriteThreshold - {}",
+            this, totalCompactedFilesSize, 
cacheConf.getCacheCompactedBlocksOnWriteThreshold());
+        }
+      }
+    } else {
+      final boolean shouldCacheDataOnWrite = 
cacheConf.shouldCacheDataOnWrite();
+      if (shouldCacheDataOnWrite) {
+        writerCacheConf.enableCacheOnWrite();
+        if (!cacheOnWriteLogged) {
+          LOG.info("For {} , cacheDataOnWrite is true, hence enabled 
cacheOnWrite for " +
+            "Index blocks and Bloom filter blocks", this);
+          cacheOnWriteLogged = true;
+        }
+      }
+    }
+    Encryption.Context encryptionContext = ctx.getEncryptionContext();
+    HFileContext hFileContext = createFileContext(params.compression(),
+      params.includeMVCCReadpoint(), params.includesTag(), encryptionContext);
+    Path outputDir;
+    if (requireWritingToTmpDirFirst()) {
+      outputDir =
+        new Path(ctx.getRegionFileSystem().getTempDir(), 
ctx.getFamily().getNameAsString());
+    } else {
+      throw new UnsupportedOperationException("not supported yet");
+    }
+    StoreFileWriter.Builder builder =
+      new StoreFileWriter.Builder(conf, writerCacheConf, 
ctx.getRegionFileSystem().getFileSystem())
+        .withOutputDir(outputDir).withBloomType(ctx.getBloomFilterType())
+        
.withMaxKeyCount(params.maxKeyCount()).withFavoredNodes(ctx.getFavoredNodes())
+        
.withFileContext(hFileContext).withShouldDropCacheBehind(params.shouldDropBehind())
+        .withCompactedFilesSupplier(ctx.getCompactedFilesSupplier())
+        .withFileStoragePolicy(params.fileStoragePolicy());
+    return builder.build();
+  }
+
+  /**
+   * Whether the implementation of this tracker requires you to write to temp 
directory first, i.e,
+   * does not allow broken store files under the actual data directory.
+   */
+  protected abstract boolean requireWritingToTmpDirFirst();
+
+  protected abstract void doAddNewStoreFiles(Collection<StoreFileInfo> 
newFiles) throws IOException;
+
+  protected abstract void doAddCompactionResults(Collection<StoreFileInfo> 
compactedFiles,
+    Collection<StoreFileInfo> newFiles) throws IOException;
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
new file mode 100644
index 0000000..4f7231b
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/storefiletracker/StoreFileTrackerFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.storefiletracker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.StoreContext;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Factory method for creating store file tracker.
+ */
+@InterfaceAudience.Private
+public final class StoreFileTrackerFactory {
+
+  public static StoreFileTracker create(Configuration conf, TableName 
tableName,
+    boolean isPrimaryReplica, StoreContext ctx) {
+    return new DefaultStoreFileTracker(conf, tableName, isPrimaryReplica, ctx);
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
index 22ec6cb..291b909 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
@@ -105,7 +105,6 @@ class MajorCompactionRequest {
 
   boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, 
long ts)
       throws IOException {
-
     // do we have any store files?
     Collection<StoreFileInfo> storeFiles = fileSystem.getStoreFiles(family);
     if (storeFiles == null) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 3c2bc3f..9314d7e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -211,11 +211,13 @@ public class TestIOFencing {
 
     @Override
     protected void refreshStoreSizeAndTotalBytes() throws IOException {
-      try {
-        r.compactionsWaiting.countDown();
-        r.compactionsBlocked.await();
-      } catch (InterruptedException ex) {
-        throw new IOException(ex);
+      if (r != null) {
+        try {
+          r.compactionsWaiting.countDown();
+          r.compactionsBlocked.await();
+        } catch (InterruptedException ex) {
+          throw new IOException(ex);
+        }
       }
       super.refreshStoreSizeAndTotalBytes();
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
index 54b0d18..7dbb680 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java
@@ -215,8 +215,10 @@ public class TestCacheOnWriteInSchema {
   @Test
   public void testCacheOnWriteInSchema() throws IOException {
     // Write some random data into the store
-    StoreFileWriter writer = store.createWriterInTmp(Integer.MAX_VALUE,
-        HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true, false, false);
+    StoreFileWriter writer = store.getStoreEngine()
+      
.createWriter(CreateStoreFileWriterParams.create().maxKeyCount(Integer.MAX_VALUE)
+        .compression(HFile.DEFAULT_COMPRESSION_ALGORITHM).isCompaction(false)
+        
.includeMVCCReadpoint(true).includesTag(false).shouldDropBehind(false));
     writeStoreFile(writer);
     writer.close();
     // Verify the block types of interest were cached on write
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
index e832c47..3784876 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java
@@ -65,9 +65,12 @@ public class TestDefaultStoreEngine {
         DummyCompactionPolicy.class.getName());
     conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
         DummyStoreFlusher.class.getName());
+    HRegion mockRegion = Mockito.mock(HRegion.class);
     HStore mockStore = Mockito.mock(HStore.class);
     
Mockito.when(mockStore.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
-    StoreEngine<?, ?, ?, ?> se = StoreEngine.create(mockStore, conf, 
CellComparatorImpl.COMPARATOR);
+    Mockito.when(mockStore.getHRegion()).thenReturn(mockRegion);
+    StoreEngine<?, ?, ?, ?> se =
+      StoreEngine.create(mockStore, conf, CellComparatorImpl.COMPARATOR);
     Assert.assertTrue(se instanceof DefaultStoreEngine);
     Assert.assertTrue(se.getCompactionPolicy() instanceof 
DummyCompactionPolicy);
     Assert.assertTrue(se.getStoreFlusher() instanceof DummyStoreFlusher);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 1d302d4..8bcbf10 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -5761,7 +5761,7 @@ public class TestHRegion {
       Collection<HStoreFile> storeFiles = 
primaryRegion.getStore(families[0]).getStorefiles();
       
primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]),
 storeFiles);
       Collection<StoreFileInfo> storeFileInfos = 
primaryRegion.getRegionFileSystem()
-          .getStoreFiles(families[0]);
+          .getStoreFiles(Bytes.toString(families[0]));
       Assert.assertTrue(storeFileInfos == null || storeFileInfos.isEmpty());
 
       verifyData(secondaryRegion, 0, 1000, cq, families);
@@ -7665,7 +7665,7 @@ public class TestHRegion {
             getCacheConfig() != null? getCacheConfig().shouldEvictOnClose(): 
true;
         for (Path newFile : newFiles) {
           // Create storefile around what we wrote with a reader on it.
-          HStoreFile sf = createStoreFileAndReader(newFile);
+          HStoreFile sf = storeEngine.createStoreFileAndReader(newFile);
           sf.closeStoreFile(evictOnClose);
           sfs.add(sf);
         }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
index 24c3ccd..92c2104 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java
@@ -53,8 +53,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.IntBinaryOperator;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -314,7 +314,7 @@ public class TestHStore {
 
   /**
    * Verify that compression and data block encoding are respected by the
-   * Store.createWriterInTmp() method, used on store flush.
+   * createWriter method, used on store flush.
    */
   @Test
   public void testCreateWriter() throws Exception {
@@ -326,9 +326,11 @@ public class TestHStore {
         .build();
     init(name.getMethodName(), conf, hcd);
 
-    // Test createWriterInTmp()
-    StoreFileWriter writer =
-        store.createWriterInTmp(4, hcd.getCompressionType(), false, true, 
false, false);
+    // Test createWriter
+    StoreFileWriter writer = store.getStoreEngine()
+      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(4)
+        
.compression(hcd.getCompressionType()).isCompaction(false).includeMVCCReadpoint(true)
+        .includesTag(false).shouldDropBehind(false));
     Path path = writer.getPath();
     writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1)));
     writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2)));
@@ -1026,19 +1028,19 @@ public class TestHStore {
     // add one more file
     addStoreFile();
 
-    HStore spiedStore = spy(store);
+    StoreEngine<?, ?, ?, ?> spiedStoreEngine = spy(store.getStoreEngine());
 
     // call first time after files changed
-    spiedStore.refreshStoreFiles();
+    spiedStoreEngine.refreshStoreFiles();
     assertEquals(2, this.store.getStorefilesCount());
-    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
+    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
 
     // call second time
-    spiedStore.refreshStoreFiles();
+    spiedStoreEngine.refreshStoreFiles();
 
     // ensure that replaceStoreFiles is not called, i.e, the times does not 
change, if files are not
     // refreshed,
-    verify(spiedStore, times(1)).replaceStoreFiles(any(), any());
+    verify(spiedStoreEngine, times(1)).replaceStoreFiles(any(), any());
   }
 
   private long countMemStoreScanner(StoreScanner scanner) {
@@ -1649,7 +1651,7 @@ public class TestHStore {
     // Do compaction
     MyThread thread = new MyThread(storeScanner);
     thread.start();
-    store.replaceStoreFiles(actualStorefiles, actualStorefiles1);
+    store.replaceStoreFiles(actualStorefiles, actualStorefiles1, false);
     thread.join();
     KeyValueHeap heap2 = thread.getHeap();
     assertFalse(heap.equals(heap2));
@@ -1715,8 +1717,10 @@ public class TestHStore {
   @Test
   public void testHFileContextSetWithCFAndTable() throws Exception {
     init(this.name.getMethodName());
-    StoreFileWriter writer = store.createWriterInTmp(10000L,
-        Compression.Algorithm.NONE, false, true, false, true);
+    StoreFileWriter writer = store.getStoreEngine()
+      .createWriter(CreateStoreFileWriterParams.create().maxKeyCount(10000L)
+        
.compression(Compression.Algorithm.NONE).isCompaction(true).includeMVCCReadpoint(true)
+        .includesTag(false).shouldDropBehind(true));
     HFileContext hFileContext = writer.getHFileWriter().getFileContext();
     assertArrayEquals(family, hFileContext.getColumnFamily());
     assertArrayEquals(table, hFileContext.getTableName());
@@ -3166,7 +3170,8 @@ public class TestHStore {
         int currentCount = clearSnapshotCounter.incrementAndGet();
         if (currentCount == 1) {
           try {
-            if (store.lock.isWriteLockedByCurrentThread()) {
+            if (((ReentrantReadWriteLock) store.getStoreEngine().getLock())
+              .isWriteLockedByCurrentThread()) {
               shouldWait = false;
             }
             /**
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
index 60f65be..1dbf37a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java
@@ -245,7 +245,7 @@ public class TestRegionMergeTransactionOnCluster {
         TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
       int count = 0;
       for(ColumnFamilyDescriptor colFamily : columnFamilies) {
-        count += hrfs.getStoreFiles(colFamily.getName()).size();
+        count += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
       }
       ADMIN.compactRegion(mergedRegionInfo.getRegionName());
       // clean up the merged region store files
@@ -254,7 +254,7 @@ public class TestRegionMergeTransactionOnCluster {
       int newcount = 0;
       while (EnvironmentEdgeManager.currentTime() < timeout) {
         for(ColumnFamilyDescriptor colFamily : columnFamilies) {
-          newcount += hrfs.getStoreFiles(colFamily.getName()).size();
+          newcount += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
         }
         if(newcount > count) {
           break;
@@ -273,7 +273,7 @@ public class TestRegionMergeTransactionOnCluster {
       while (EnvironmentEdgeManager.currentTime() < timeout) {
         int newcount1 = 0;
         for(ColumnFamilyDescriptor colFamily : columnFamilies) {
-          newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
+          newcount1 += hrfs.getStoreFiles(colFamily.getNameAsString()).size();
         }
         if(newcount1 <= 1) {
           break;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
index 9141327..2fab050 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java
@@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -98,7 +97,7 @@ public class TestStoreFileRefresherChore {
     }
 
     @Override
-    public Collection<StoreFileInfo> getStoreFiles(String familyName) throws 
IOException {
+    public List<StoreFileInfo> getStoreFiles(String familyName) throws 
IOException {
       if (fail) {
         throw new IOException("simulating FS failure");
       }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
index c2b7849..79a9705 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScannerClosure.java
@@ -29,7 +29,6 @@ import java.util.NavigableSet;
 import java.util.Random;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -128,13 +127,12 @@ public class TestStoreScannerClosure {
       p.addColumn(fam, Bytes.toBytes("q1"), Bytes.toBytes("val"));
       region.put(p);
       HStore store = region.getStore(fam);
-      ReentrantReadWriteLock lock = store.lock;
       // use the lock to manually get a new memstore scanner. this is what
       // HStore#notifyChangedReadersObservers does under the lock.(lock is not 
needed here
       //since it is just a testcase).
-      lock.readLock().lock();
+      store.getStoreEngine().readLock();
       final List<KeyValueScanner> memScanners = 
store.memstore.getScanners(Long.MAX_VALUE);
-      lock.readLock().unlock();
+      store.getStoreEngine().readUnlock();
       Thread closeThread = new Thread() {
         public void run() {
           // close should be completed
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
index f5330f6..eb0b1c1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
@@ -118,8 +118,10 @@ public class TestStripeStoreEngine {
   }
 
   private static TestStoreEngine createEngine(Configuration conf) throws 
Exception {
+    HRegion region = mock(HRegion.class);
     HStore store = mock(HStore.class);
     
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
+    when(store.getHRegion()).thenReturn(region);
     CellComparatorImpl kvComparator = mock(CellComparatorImpl.class);
     return (TestStoreEngine) StoreEngine.create(store, conf, kvComparator);
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
index d7b7ba7..0ea82c5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
@@ -22,9 +22,6 @@ import static 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.cre
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -45,11 +42,13 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreUtils;
 import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
@@ -110,10 +109,9 @@ public class TestDateTieredCompactor {
     when(store.areWritesEnabled()).thenReturn(true);
     when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
     
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
-    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-      anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
-    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-      anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), 
anyString())).thenAnswer(writers);
+    StoreEngine storeEngine = mock(StoreEngine.class);
+    
when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
+    when(store.getStoreEngine()).thenReturn(storeEngine);
     when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
     OptionalLong maxSequenceId = StoreUtils.getMaxSequenceIdInList(storefiles);
     when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 2ba15d1..bdab20e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -30,7 +30,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
@@ -60,6 +59,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
 import org.apache.hadoop.hbase.regionserver.StoreFileReader;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
@@ -864,12 +865,9 @@ public class TestStripeCompactionPolicy {
     when(info.getRegionNameAsString()).thenReturn("testRegion");
     when(store.getColumnFamilyDescriptor()).thenReturn(familyDescriptor);
     when(store.getRegionInfo()).thenReturn(info);
-    when(
-      store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-        anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
-    when(
-      store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-        anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), 
anyString())).thenAnswer(writers);
+    StoreEngine storeEngine = mock(StoreEngine.class);
+    
when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
+    when(store.getStoreEngine()).thenReturn(storeEngine);
 
     Configuration conf = HBaseConfiguration.create();
     conf.setBoolean("hbase.regionserver.compaction.private.readers", 
usePrivateReaders);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
index e49174e..ae59c74 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
@@ -21,9 +21,6 @@ import static 
org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_K
 import static 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -43,10 +40,12 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
 import 
org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
@@ -209,10 +208,9 @@ public class TestStripeCompactor {
     when(store.areWritesEnabled()).thenReturn(true);
     when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
     
when(store.getRegionInfo()).thenReturn(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
-    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-      anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
-    when(store.createWriterInTmp(anyLong(), any(), anyBoolean(),
-      anyBoolean(), anyBoolean(), anyBoolean(), anyLong(), 
anyString())).thenAnswer(writers);
+    StoreEngine storeEngine = mock(StoreEngine.class);
+    
when(storeEngine.createWriter(any(CreateStoreFileWriterParams.class))).thenAnswer(writers);
+    when(store.getStoreEngine()).thenReturn(storeEngine);
     when(store.getComparator()).thenReturn(CellComparatorImpl.COMPARATOR);
 
     return new StripeCompactor(conf, store) {

Reply via email to