This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch HBASE-24749
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-24749 by this push:
     new 49b68b0  Flush directly into data directory, skip rename when 
committing flush
49b68b0 is described below

commit 49b68b0e00c81ecaed586f5837f88ab3216dfbf0
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Wed Jun 9 16:32:39 2021 +0100

    Flush directly into data directory, skip rename when committing flush
    
    Signed-off-by: Peter Somogyi <[email protected]>
---
 .../regionserver/DefaultStoreFlushContext.java     |  19 +--
 .../hbase/regionserver/DefaultStoreFlusher.java    |  27 ++++-
 .../apache/hadoop/hbase/regionserver/HStore.java   |   2 +-
 .../regionserver/PersistedEngineStoreFlusher.java  |  83 ++++++++++++++
 .../regionserver/PersistedStoreFlushContext.java   |  66 +++++++++++
 .../TestPersistedEngineStoreFlusher.java           | 127 +++++++++++++++++++++
 .../TestPersistedStoreFlushContext.java            |  77 +++++++++++++
 7 files changed, 388 insertions(+), 13 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java
index b97210e..d007132 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java
@@ -43,12 +43,12 @@ public class DefaultStoreFlushContext extends 
StoreFlushContext {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultStoreFlushContext.class);
 
-  private MemStoreSnapshot snapshot;
-  private List<Path> tempFiles;
-  private List<Path> committedFiles;
-  private long cacheFlushCount;
-  private long cacheFlushSize;
-  private long outputFileSize;
+  protected MemStoreSnapshot snapshot;
+  protected List<Path> tempFiles;
+  protected List<Path> committedFiles;
+  protected long cacheFlushCount;
+  protected long cacheFlushSize;
+  protected long outputFileSize;
 
   public void init(HStore store, Long cacheFlushSeqNum, FlushLifeCycleTracker 
tracker) {
     super.init(store, cacheFlushSeqNum, tracker);
@@ -79,13 +79,18 @@ public class DefaultStoreFlushContext extends 
StoreFlushContext {
 
   @Override
   public boolean commit(MonitoredTask status) throws IOException {
+    return commit(p -> store.commitFile(p, cacheFlushSeqNum, status));
+  }
+
+  protected boolean 
commit(DefaultStoreFlusher.IOCheckedFunction<Path,HStoreFile> commitFunction)
+      throws IOException {
     if (CollectionUtils.isEmpty(this.tempFiles)) {
       return false;
     }
     List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
     for (Path storeFilePath : tempFiles) {
       try {
-        HStoreFile sf = store.commitFile(storeFilePath, cacheFlushSeqNum, 
status);
+        HStoreFile sf = commitFunction.apply(storeFilePath);
         outputFileSize += sf.getReader().length();
         storeFiles.add(sf);
       } catch (IOException ex) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index a7d7fb1..cad5c99 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -47,6 +47,20 @@ public class DefaultStoreFlusher extends StoreFlusher {
   public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
       MonitoredTask status, ThroughputController throughputController,
       FlushLifeCycleTracker tracker) throws IOException {
+    return flushSnapshot(snapshot, cacheFlushId, status, throughputController, 
tracker,
+      s -> s.createWriterInTmp(snapshot.getCellsCount(),
+        s.getColumnFamilyDescriptor().getCompressionType(),
+            false,
+            true,
+            snapshot.isTagsPresent(),
+            false));
+  }
+
+
+  protected List<Path> flushSnapshot(MemStoreSnapshot snapshot, long 
cacheFlushId,
+      MonitoredTask status, ThroughputController throughputController,
+      FlushLifeCycleTracker tracker, IOCheckedFunction<HStore,StoreFileWriter> 
createWriter)
+        throws IOException {
     ArrayList<Path> result = new ArrayList<>();
     int cellsCount = snapshot.getCellsCount();
     if (cellsCount == 0) return result; // don't flush if there are no entries
@@ -60,9 +74,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
       synchronized (flushLock) {
         status.setStatus("Flushing " + store + ": creating writer");
         // Write the map out to the disk
-        writer = store.createWriterInTmp(cellsCount,
-            store.getColumnFamilyDescriptor().getCompressionType(), false, 
true,
-            snapshot.isTagsPresent(), false);
+        writer = createWriter.apply(store);
         IOException e = null;
         try {
           performFlush(scanner, writer, throughputController);
@@ -82,9 +94,14 @@ public class DefaultStoreFlusher extends StoreFlusher {
       scanner.close();
     }
     LOG.info("Flushed memstore data size={} at sequenceid={} (bloomFilter={}), 
to={}",
-        StringUtils.byteDesc(snapshot.getDataSize()), cacheFlushId, 
writer.hasGeneralBloom(),
-        writer.getPath());
+      StringUtils.byteDesc(snapshot.getDataSize()), cacheFlushId, 
writer.hasGeneralBloom(),
+      writer.getPath());
     result.add(writer.getPath());
     return result;
   }
+
+  @FunctionalInterface
+  public interface IOCheckedFunction<T,R> {
+    R apply(T t) throws IOException;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 7b558c9..ce0c641 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1192,7 +1192,7 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
     return builder.build();
   }
 
-  private HFileContext createFileContext(Compression.Algorithm compression,
+  HFileContext createFileContext(Compression.Algorithm compression,
     boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context 
encryptionContext) {
     if (compression == null) {
       compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java
new file mode 100644
index 0000000..9094e58
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+
+/**
+ * A StoreFlusher that writes hfiles directly into the actual store directory,
+ * instead of a temp dir..
+ */
[email protected]
+public class PersistedEngineStoreFlusher extends DefaultStoreFlusher {
+
+  public PersistedEngineStoreFlusher(Configuration conf, HStore store) {
+    super(conf, store);
+  }
+
+  @Override
+  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
+      MonitoredTask status, ThroughputController throughputController,
+      FlushLifeCycleTracker tracker) throws IOException {
+    return flushSnapshot(snapshot, cacheFlushId, status, throughputController, 
tracker,
+      s -> createWriter(snapshot.getCellsCount(),
+            s.getColumnFamilyDescriptor().getCompressionType(),
+            snapshot.isTagsPresent()));
+  }
+
+  public StoreFileWriter createWriter(long maxKeyCount, Compression.Algorithm 
compression,
+      boolean includesTag) throws IOException {
+    Path familyDir = new Path(store.getRegionFileSystem().getRegionDir(),
+      store.getColumnFamilyName());
+    HFileContext hFileContext = store.createFileContext(compression, true,
+      includesTag, store.getStoreContext().getEncryptionContext());
+
+    StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, 
store.getCacheConfig(),
+      store.getFileSystem())
+      .withOutputDir(familyDir)
+      .withBloomType(store.getColumnFamilyDescriptor().getBloomFilterType())
+      .withMaxKeyCount(maxKeyCount)
+      .withFavoredNodes(getFavoredNodes(store.getHRegion()))
+      .withFileContext(hFileContext)
+      .withShouldDropCacheBehind(false)
+      .withCompactedFilesSupplier(store::getCompactedFiles);
+    return builder.build();
+  }
+
+  private InetSocketAddress[] getFavoredNodes(HRegion region){
+    InetSocketAddress[] favoredNodes = null;
+    if (region.getRegionServerServices() != null) {
+      favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
+        region.getRegionInfo().getEncodedName());
+    }
+    return favoredNodes;
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java
new file mode 100644
index 0000000..0031b0e
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A implementation of DefaultStoreFlushContext that assumes committed store 
files were written
+ * directly in the store dir, and therefore, doesn't perform a rename from tmp 
dir
+ * into the store dir.
+ *
+ * To be used only when PersistedStoreEngine is configured as the StoreEngine 
implementation.
+ */
[email protected]
+public class PersistedStoreFlushContext extends DefaultStoreFlushContext {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PersistedStoreFlushContext.class);
+
+  public PersistedStoreFlushContext(HStore store, Long cacheFlushSeqNum,
+    FlushLifeCycleTracker tracker) {
+    super.init(store, cacheFlushSeqNum, tracker);
+  }
+
+
+  @Override
+  public boolean commit(MonitoredTask status) throws IOException {
+    return super.commit(p -> {
+      status.setStatus("Flushing " + this.store +
+        ": reopening file created directly in family dir");
+      HStoreFile sf = store.createStoreFileAndReader(p);
+
+      StoreFileReader r = sf.getReader();
+      this.store.storeSize.addAndGet(r.length());
+      
this.store.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
+
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf, 
r.getEntries(),
+          cacheFlushSeqNum,
+          StringUtils.TraditionalBinaryPrefix.long2String(r.length(), "", 1));
+      }
+      return sf;
+    });
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java
new file mode 100644
index 0000000..e1fb6d0
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.ChecksumType;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Test class for the TestPersistedStoreFlushContext
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestPersistedEngineStoreFlusher {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestPersistedEngineStoreFlusher.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private Configuration config = new Configuration();
+  private HStore mockStore;
+  private MemStoreSnapshot mockSnapshot;
+  private String cfName = name.getMethodName()+"-CF";
+
+  @Before
+  public void setup() throws Exception {
+    Path filePath = new Path(name.getMethodName());
+    mockStore = mock(HStore.class);
+    HRegionFileSystem mockRegionFS = mock(HRegionFileSystem.class);
+    when(mockStore.getRegionFileSystem()).thenReturn(mockRegionFS);
+    when(mockRegionFS.getRegionDir()).thenReturn(filePath);
+    when(mockStore.getColumnFamilyName()).thenReturn(cfName);
+    HFileContext mockFileContext = mock(HFileContext.class);
+    when(mockFileContext.getBytesPerChecksum()).thenReturn(100);
+    StoreContext mockStoreContext = new StoreContext.Builder().build();
+    when(mockStore.createFileContext(isNull(), anyBoolean(),
+      anyBoolean(), isNull())).thenReturn(mockFileContext);
+    when(mockStore.getStoreContext()).thenReturn(mockStoreContext);
+    mockSnapshot = mock(MemStoreSnapshot.class);
+    when(mockSnapshot.getCellsCount()).thenReturn(1);
+    when(mockStore.getHRegion()).thenReturn(mock(HRegion.class));
+    ScanInfo mockScanInfo = mock(ScanInfo.class);
+    when(mockStore.getScanInfo()).thenReturn(mockScanInfo);
+    when(mockScanInfo.getComparator()).thenReturn(mock(CellComparator.class));
+    ColumnFamilyDescriptor mockDesc = mock(ColumnFamilyDescriptor.class);
+    when(mockDesc.getBloomFilterType()).thenReturn(BloomType.NONE);
+    when(mockStore.getColumnFamilyDescriptor()).thenReturn(mockDesc);
+    FileSystem mockFS = mock(FileSystem.class);
+    when(mockFS.exists(any(Path.class))).thenReturn(true);
+    FileStatus mockFileStatus = mock(FileStatus.class);
+    when(mockFileStatus.isDirectory()).thenReturn(true);
+    when(mockFS.getFileStatus(any(Path.class))).thenReturn(mockFileStatus);
+    when(mockStore.getFileSystem()).thenReturn(mockFS);
+    when(mockFS.getConf()).thenReturn(config);
+    when(mockFS.create(any(Path.class), any(FsPermission.class), 
any(Boolean.class),
+      any(Integer.class), any(Short.class), any(Long.class), any()))
+      .thenReturn(mock(FSDataOutputStream.class));
+    CacheConfig mockCacheConfig = mock(CacheConfig.class);
+    
when(mockCacheConfig.getByteBuffAllocator()).thenReturn(mock(ByteBuffAllocator.class));
+    when(mockStore.getCacheConfig()).thenReturn(mockCacheConfig);
+    
when(mockFileContext.getEncryptionContext()).thenReturn(Encryption.Context.NONE);
+    
when(mockFileContext.getCompression()).thenReturn(Compression.Algorithm.NONE);
+    when(mockFileContext.getChecksumType()).thenReturn(ChecksumType.NULL);
+    
when(mockFileContext.getCellComparator()).thenReturn(mock(CellComparator.class));
+    when(mockStore.getRegionInfo()).thenReturn(mock(RegionInfo.class));
+  }
+
+  @Test
+  public void testCreateWriter() throws Exception {
+    PersistedEngineStoreFlusher flusher = new 
PersistedEngineStoreFlusher(config, mockStore);
+    List<Path> files = flusher.flushSnapshot(mockSnapshot, 0, 
mock(MonitoredTask.class),
+      null, FlushLifeCycleTracker.DUMMY);
+    assertEquals(1, files.size());
+    //asserts the file is created in the CF dir directly, instead of a temp dif
+    Path filePath = new Path(name.getMethodName());
+    assertEquals(new Path(filePath, cfName), files.get(0).getParent());
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java
new file mode 100644
index 0000000..298116e
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * Test class for the TestPersistedStoreFlushContext
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestPersistedStoreFlushContext {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestPersistedStoreFlushContext.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Test
+  public void testCommit() throws Exception {
+    HStore mockStore = mock(HStore.class);
+    mockStore.storeSize = new AtomicLong(0);
+    mockStore.totalUncompressedBytes = new AtomicLong(0);
+    mockStore.flushedCellsCount = new AtomicLong(0);
+    mockStore.flushedCellsSize = new AtomicLong(0);
+    mockStore.flushedOutputFileSize = new AtomicLong(0);
+    Path filePath = new Path(name.getMethodName());
+    ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class);
+    HStoreFile mockStoreFile = mock(HStoreFile.class);
+    when(mockStoreFile.getReader()).thenReturn(mock(StoreFileReader.class));
+    
when(mockStore.createStoreFileAndReader(captor.capture())).thenReturn(mockStoreFile);
+    PersistedStoreFlushContext context = new 
PersistedStoreFlushContext(mockStore,
+      0L, FlushLifeCycleTracker.DUMMY);
+    context.tempFiles = new ArrayList<>();
+    context.tempFiles.add(filePath);
+    context.committedFiles = new ArrayList<>();
+    context.snapshot = mock(MemStoreSnapshot.class);
+    context.commit(mock(MonitoredTask.class));
+    //asserts that original file didn't get renamed after the commit operation
+    assertEquals(filePath, captor.getValue());
+  }
+
+}

Reply via email to