This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch HBASE-24749
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-24749 by this push:
new 49b68b0 Flush directly into data directory, skip rename when
committing flush
49b68b0 is described below
commit 49b68b0e00c81ecaed586f5837f88ab3216dfbf0
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Wed Jun 9 16:32:39 2021 +0100
Flush directly into data directory, skip rename when committing flush
Signed-off-by: Peter Somogyi <[email protected]>
---
.../regionserver/DefaultStoreFlushContext.java | 19 +--
.../hbase/regionserver/DefaultStoreFlusher.java | 27 ++++-
.../apache/hadoop/hbase/regionserver/HStore.java | 2 +-
.../regionserver/PersistedEngineStoreFlusher.java | 83 ++++++++++++++
.../regionserver/PersistedStoreFlushContext.java | 66 +++++++++++
.../TestPersistedEngineStoreFlusher.java | 127 +++++++++++++++++++++
.../TestPersistedStoreFlushContext.java | 77 +++++++++++++
7 files changed, 388 insertions(+), 13 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java
index b97210e..d007132 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java
@@ -43,12 +43,12 @@ public class DefaultStoreFlushContext extends
StoreFlushContext {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultStoreFlushContext.class);
- private MemStoreSnapshot snapshot;
- private List<Path> tempFiles;
- private List<Path> committedFiles;
- private long cacheFlushCount;
- private long cacheFlushSize;
- private long outputFileSize;
+ protected MemStoreSnapshot snapshot;
+ protected List<Path> tempFiles;
+ protected List<Path> committedFiles;
+ protected long cacheFlushCount;
+ protected long cacheFlushSize;
+ protected long outputFileSize;
public void init(HStore store, Long cacheFlushSeqNum, FlushLifeCycleTracker
tracker) {
super.init(store, cacheFlushSeqNum, tracker);
@@ -79,13 +79,18 @@ public class DefaultStoreFlushContext extends
StoreFlushContext {
@Override
public boolean commit(MonitoredTask status) throws IOException {
+ return commit(p -> store.commitFile(p, cacheFlushSeqNum, status));
+ }
+
+ protected boolean
commit(DefaultStoreFlusher.IOCheckedFunction<Path,HStoreFile> commitFunction)
+ throws IOException {
if (CollectionUtils.isEmpty(this.tempFiles)) {
return false;
}
List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
for (Path storeFilePath : tempFiles) {
try {
- HStoreFile sf = store.commitFile(storeFilePath, cacheFlushSeqNum,
status);
+ HStoreFile sf = commitFunction.apply(storeFilePath);
outputFileSize += sf.getReader().length();
storeFiles.add(sf);
} catch (IOException ex) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index a7d7fb1..cad5c99 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -47,6 +47,20 @@ public class DefaultStoreFlusher extends StoreFlusher {
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
+ return flushSnapshot(snapshot, cacheFlushId, status, throughputController,
tracker,
+ s -> s.createWriterInTmp(snapshot.getCellsCount(),
+ s.getColumnFamilyDescriptor().getCompressionType(),
+ false,
+ true,
+ snapshot.isTagsPresent(),
+ false));
+ }
+
+
+ protected List<Path> flushSnapshot(MemStoreSnapshot snapshot, long
cacheFlushId,
+ MonitoredTask status, ThroughputController throughputController,
+ FlushLifeCycleTracker tracker, IOCheckedFunction<HStore,StoreFileWriter>
createWriter)
+ throws IOException {
ArrayList<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
@@ -60,9 +74,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
- writer = store.createWriterInTmp(cellsCount,
- store.getColumnFamilyDescriptor().getCompressionType(), false,
true,
- snapshot.isTagsPresent(), false);
+ writer = createWriter.apply(store);
IOException e = null;
try {
performFlush(scanner, writer, throughputController);
@@ -82,9 +94,14 @@ public class DefaultStoreFlusher extends StoreFlusher {
scanner.close();
}
LOG.info("Flushed memstore data size={} at sequenceid={} (bloomFilter={}),
to={}",
- StringUtils.byteDesc(snapshot.getDataSize()), cacheFlushId,
writer.hasGeneralBloom(),
- writer.getPath());
+ StringUtils.byteDesc(snapshot.getDataSize()), cacheFlushId,
writer.hasGeneralBloom(),
+ writer.getPath());
result.add(writer.getPath());
return result;
}
+
+ @FunctionalInterface
+ public interface IOCheckedFunction<T,R> {
+ R apply(T t) throws IOException;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 7b558c9..ce0c641 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1192,7 +1192,7 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
return builder.build();
}
- private HFileContext createFileContext(Compression.Algorithm compression,
+ HFileContext createFileContext(Compression.Algorithm compression,
boolean includeMVCCReadpoint, boolean includesTag, Encryption.Context
encryptionContext) {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java
new file mode 100644
index 0000000..9094e58
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedEngineStoreFlusher.java
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+
+/**
+ * A StoreFlusher that writes hfiles directly into the actual store directory,
+ * instead of a temp dir..
+ */
[email protected]
+public class PersistedEngineStoreFlusher extends DefaultStoreFlusher {
+
+ public PersistedEngineStoreFlusher(Configuration conf, HStore store) {
+ super(conf, store);
+ }
+
+ @Override
+ public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
+ MonitoredTask status, ThroughputController throughputController,
+ FlushLifeCycleTracker tracker) throws IOException {
+ return flushSnapshot(snapshot, cacheFlushId, status, throughputController,
tracker,
+ s -> createWriter(snapshot.getCellsCount(),
+ s.getColumnFamilyDescriptor().getCompressionType(),
+ snapshot.isTagsPresent()));
+ }
+
+ public StoreFileWriter createWriter(long maxKeyCount, Compression.Algorithm
compression,
+ boolean includesTag) throws IOException {
+ Path familyDir = new Path(store.getRegionFileSystem().getRegionDir(),
+ store.getColumnFamilyName());
+ HFileContext hFileContext = store.createFileContext(compression, true,
+ includesTag, store.getStoreContext().getEncryptionContext());
+
+ StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf,
store.getCacheConfig(),
+ store.getFileSystem())
+ .withOutputDir(familyDir)
+ .withBloomType(store.getColumnFamilyDescriptor().getBloomFilterType())
+ .withMaxKeyCount(maxKeyCount)
+ .withFavoredNodes(getFavoredNodes(store.getHRegion()))
+ .withFileContext(hFileContext)
+ .withShouldDropCacheBehind(false)
+ .withCompactedFilesSupplier(store::getCompactedFiles);
+ return builder.build();
+ }
+
+ private InetSocketAddress[] getFavoredNodes(HRegion region){
+ InetSocketAddress[] favoredNodes = null;
+ if (region.getRegionServerServices() != null) {
+ favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
+ region.getRegionInfo().getEncodedName());
+ }
+ return favoredNodes;
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java
new file mode 100644
index 0000000..0031b0e
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PersistedStoreFlushContext.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A implementation of DefaultStoreFlushContext that assumes committed store
files were written
+ * directly in the store dir, and therefore, doesn't perform a rename from tmp
dir
+ * into the store dir.
+ *
+ * To be used only when PersistedStoreEngine is configured as the StoreEngine
implementation.
+ */
[email protected]
+public class PersistedStoreFlushContext extends DefaultStoreFlushContext {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PersistedStoreFlushContext.class);
+
+ public PersistedStoreFlushContext(HStore store, Long cacheFlushSeqNum,
+ FlushLifeCycleTracker tracker) {
+ super.init(store, cacheFlushSeqNum, tracker);
+ }
+
+
+ @Override
+ public boolean commit(MonitoredTask status) throws IOException {
+ return super.commit(p -> {
+ status.setStatus("Flushing " + this.store +
+ ": reopening file created directly in family dir");
+ HStoreFile sf = store.createStoreFileAndReader(p);
+
+ StoreFileReader r = sf.getReader();
+ this.store.storeSize.addAndGet(r.length());
+
this.store.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Added {}, entries={}, sequenceid={}, filesize={}", sf,
r.getEntries(),
+ cacheFlushSeqNum,
+ StringUtils.TraditionalBinaryPrefix.long2String(r.length(), "", 1));
+ }
+ return sf;
+ });
+ }
+
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java
new file mode 100644
index 0000000..e1fb6d0
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedEngineStoreFlusher.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.ChecksumType;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Test class for the TestPersistedStoreFlushContext
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestPersistedEngineStoreFlusher {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestPersistedEngineStoreFlusher.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private Configuration config = new Configuration();
+ private HStore mockStore;
+ private MemStoreSnapshot mockSnapshot;
+ private String cfName = name.getMethodName()+"-CF";
+
+ @Before
+ public void setup() throws Exception {
+ Path filePath = new Path(name.getMethodName());
+ mockStore = mock(HStore.class);
+ HRegionFileSystem mockRegionFS = mock(HRegionFileSystem.class);
+ when(mockStore.getRegionFileSystem()).thenReturn(mockRegionFS);
+ when(mockRegionFS.getRegionDir()).thenReturn(filePath);
+ when(mockStore.getColumnFamilyName()).thenReturn(cfName);
+ HFileContext mockFileContext = mock(HFileContext.class);
+ when(mockFileContext.getBytesPerChecksum()).thenReturn(100);
+ StoreContext mockStoreContext = new StoreContext.Builder().build();
+ when(mockStore.createFileContext(isNull(), anyBoolean(),
+ anyBoolean(), isNull())).thenReturn(mockFileContext);
+ when(mockStore.getStoreContext()).thenReturn(mockStoreContext);
+ mockSnapshot = mock(MemStoreSnapshot.class);
+ when(mockSnapshot.getCellsCount()).thenReturn(1);
+ when(mockStore.getHRegion()).thenReturn(mock(HRegion.class));
+ ScanInfo mockScanInfo = mock(ScanInfo.class);
+ when(mockStore.getScanInfo()).thenReturn(mockScanInfo);
+ when(mockScanInfo.getComparator()).thenReturn(mock(CellComparator.class));
+ ColumnFamilyDescriptor mockDesc = mock(ColumnFamilyDescriptor.class);
+ when(mockDesc.getBloomFilterType()).thenReturn(BloomType.NONE);
+ when(mockStore.getColumnFamilyDescriptor()).thenReturn(mockDesc);
+ FileSystem mockFS = mock(FileSystem.class);
+ when(mockFS.exists(any(Path.class))).thenReturn(true);
+ FileStatus mockFileStatus = mock(FileStatus.class);
+ when(mockFileStatus.isDirectory()).thenReturn(true);
+ when(mockFS.getFileStatus(any(Path.class))).thenReturn(mockFileStatus);
+ when(mockStore.getFileSystem()).thenReturn(mockFS);
+ when(mockFS.getConf()).thenReturn(config);
+ when(mockFS.create(any(Path.class), any(FsPermission.class),
any(Boolean.class),
+ any(Integer.class), any(Short.class), any(Long.class), any()))
+ .thenReturn(mock(FSDataOutputStream.class));
+ CacheConfig mockCacheConfig = mock(CacheConfig.class);
+
when(mockCacheConfig.getByteBuffAllocator()).thenReturn(mock(ByteBuffAllocator.class));
+ when(mockStore.getCacheConfig()).thenReturn(mockCacheConfig);
+
when(mockFileContext.getEncryptionContext()).thenReturn(Encryption.Context.NONE);
+
when(mockFileContext.getCompression()).thenReturn(Compression.Algorithm.NONE);
+ when(mockFileContext.getChecksumType()).thenReturn(ChecksumType.NULL);
+
when(mockFileContext.getCellComparator()).thenReturn(mock(CellComparator.class));
+ when(mockStore.getRegionInfo()).thenReturn(mock(RegionInfo.class));
+ }
+
+ @Test
+ public void testCreateWriter() throws Exception {
+ PersistedEngineStoreFlusher flusher = new
PersistedEngineStoreFlusher(config, mockStore);
+ List<Path> files = flusher.flushSnapshot(mockSnapshot, 0,
mock(MonitoredTask.class),
+ null, FlushLifeCycleTracker.DUMMY);
+ assertEquals(1, files.size());
+ //asserts the file is created in the CF dir directly, instead of a temp dif
+ Path filePath = new Path(name.getMethodName());
+ assertEquals(new Path(filePath, cfName), files.get(0).getParent());
+ }
+
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java
new file mode 100644
index 0000000..298116e
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPersistedStoreFlushContext.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * Test class for the TestPersistedStoreFlushContext
+ */
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestPersistedStoreFlushContext {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestPersistedStoreFlushContext.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Test
+ public void testCommit() throws Exception {
+ HStore mockStore = mock(HStore.class);
+ mockStore.storeSize = new AtomicLong(0);
+ mockStore.totalUncompressedBytes = new AtomicLong(0);
+ mockStore.flushedCellsCount = new AtomicLong(0);
+ mockStore.flushedCellsSize = new AtomicLong(0);
+ mockStore.flushedOutputFileSize = new AtomicLong(0);
+ Path filePath = new Path(name.getMethodName());
+ ArgumentCaptor<Path> captor = ArgumentCaptor.forClass(Path.class);
+ HStoreFile mockStoreFile = mock(HStoreFile.class);
+ when(mockStoreFile.getReader()).thenReturn(mock(StoreFileReader.class));
+
when(mockStore.createStoreFileAndReader(captor.capture())).thenReturn(mockStoreFile);
+ PersistedStoreFlushContext context = new
PersistedStoreFlushContext(mockStore,
+ 0L, FlushLifeCycleTracker.DUMMY);
+ context.tempFiles = new ArrayList<>();
+ context.tempFiles.add(filePath);
+ context.committedFiles = new ArrayList<>();
+ context.snapshot = mock(MemStoreSnapshot.class);
+ context.commit(mock(MonitoredTask.class));
+ //asserts that original file didn't get renamed after the commit operation
+ assertEquals(filePath, captor.getValue());
+ }
+
+}