joshelser commented on a change in pull request #3298:
URL: https://github.com/apache/hbase/pull/3298#discussion_r638244458
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlushContext.java
##########
@@ -0,0 +1,183 @@
+/*
+ *
Review comment:
nit unnecessary line
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java
##########
@@ -29,15 +29,25 @@
* A store flush context carries the state required to prepare/flush/commit
the store's cache.
*/
@InterfaceAudience.Private
-interface StoreFlushContext {
+public abstract class StoreFlushContext {
+
+ protected HStore store;
+ protected long cacheFlushSeqNum;
+ protected FlushLifeCycleTracker tracker;
+
+ public StoreFlushContext(HStore store, Long cacheFlushSeqNum,
FlushLifeCycleTracker tracker){
Review comment:
It looks like it would take some work to do it, but can we make this be
a `Store` instead of `HStore`?
It looks like most of the methods called by DefaultStoreFileContext are
actually defined on HStore right now.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
##########
@@ -2360,149 +2364,15 @@ public void upsert(Iterable<Cell> cells, long
readpoint, MemStoreSizing memstore
}
}
- public StoreFlushContext createFlushContext(long cacheFlushId,
FlushLifeCycleTracker tracker) {
- return new StoreFlusherImpl(cacheFlushId, tracker);
- }
-
- private final class StoreFlusherImpl implements StoreFlushContext {
-
- private final FlushLifeCycleTracker tracker;
- private final long cacheFlushSeqNum;
- private MemStoreSnapshot snapshot;
- private List<Path> tempFiles;
- private List<Path> committedFiles;
- private long cacheFlushCount;
- private long cacheFlushSize;
- private long outputFileSize;
-
- private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker
tracker) {
- this.cacheFlushSeqNum = cacheFlushSeqNum;
- this.tracker = tracker;
- }
-
- /**
- * This is not thread safe. The caller should have a lock on the region or
the store.
- * If necessary, the lock can be added with the patch provided in
HBASE-10087
- */
- @Override
- public MemStoreSize prepare() {
- // passing the current sequence number of the wal - to allow bookkeeping
in the memstore
- this.snapshot = memstore.snapshot();
- this.cacheFlushCount = snapshot.getCellsCount();
- this.cacheFlushSize = snapshot.getDataSize();
- committedFiles = new ArrayList<>(1);
- return snapshot.getMemStoreSize();
- }
-
- @Override
- public void flushCache(MonitoredTask status) throws IOException {
- RegionServerServices rsService = region.getRegionServerServices();
- ThroughputController throughputController =
- rsService == null ? null : rsService.getFlushThroughputController();
- tempFiles =
- HStore.this.flushCache(cacheFlushSeqNum, snapshot, status,
throughputController, tracker);
- }
-
- @Override
- public boolean commit(MonitoredTask status) throws IOException {
- if (CollectionUtils.isEmpty(this.tempFiles)) {
- return false;
- }
- List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size());
- for (Path storeFilePath : tempFiles) {
- try {
- HStoreFile sf = HStore.this.commitFile(storeFilePath,
cacheFlushSeqNum, status);
- outputFileSize += sf.getReader().length();
- storeFiles.add(sf);
- } catch (IOException ex) {
- LOG.error("Failed to commit store file {}", storeFilePath, ex);
- // Try to delete the files we have committed before.
- for (HStoreFile sf : storeFiles) {
- Path pathToDelete = sf.getPath();
- try {
- sf.deleteStoreFile();
- } catch (IOException deleteEx) {
- LOG.error(HBaseMarkers.FATAL, "Failed to delete store file we
committed, "
- + "halting {}", pathToDelete, ex);
- Runtime.getRuntime().halt(1);
- }
- }
- throw new IOException("Failed to commit the flush", ex);
- }
- }
-
- for (HStoreFile sf : storeFiles) {
- if (HStore.this.getCoprocessorHost() != null) {
- HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker);
- }
- committedFiles.add(sf.getPath());
- }
-
- HStore.this.flushedCellsCount.addAndGet(cacheFlushCount);
- HStore.this.flushedCellsSize.addAndGet(cacheFlushSize);
- HStore.this.flushedOutputFileSize.addAndGet(outputFileSize);
-
- // Add new file to store files. Clear snapshot too while we have the
Store write lock.
- return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
- }
-
- @Override
- public long getOutputFileSize() {
- return outputFileSize;
- }
-
- @Override
- public List<Path> getCommittedFiles() {
- return committedFiles;
- }
-
- /**
- * Similar to commit, but called in secondary region replicas for
replaying the
- * flush cache from primary region. Adds the new files to the store, and
drops the
- * snapshot depending on dropMemstoreSnapshot argument.
- * @param fileNames names of the flushed files
- * @param dropMemstoreSnapshot whether to drop the prepared memstore
snapshot
- */
- @Override
- public void replayFlush(List<String> fileNames, boolean
dropMemstoreSnapshot)
- throws IOException {
- List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size());
- for (String file : fileNames) {
- // open the file as a store file (hfile link, etc)
- StoreFileInfo storeFileInfo =
- getRegionFileSystem().getStoreFileInfo(getColumnFamilyName(), file);
- HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
- storeFiles.add(storeFile);
- HStore.this.storeSize.addAndGet(storeFile.getReader().length());
- HStore.this.totalUncompressedBytes
- .addAndGet(storeFile.getReader().getTotalUncompressedBytes());
- if (LOG.isInfoEnabled()) {
- LOG.info(this + " added " + storeFile + ", entries=" +
storeFile.getReader().getEntries() +
- ", sequenceid=" + storeFile.getReader().getSequenceID() + ",
filesize="
- +
TraditionalBinaryPrefix.long2String(storeFile.getReader().length(), "", 1));
- }
- }
-
- long snapshotId = -1; // -1 means do not drop
- if (dropMemstoreSnapshot && snapshot != null) {
- snapshotId = snapshot.getId();
- snapshot.close();
- }
- HStore.this.updateStorefiles(storeFiles, snapshotId);
- }
-
- /**
- * Abort the snapshot preparation. Drops the snapshot if any.
- */
- @Override
- public void abort() throws IOException {
- if (snapshot != null) {
- //We need to close the snapshot when aborting, otherwise, the segment
scanner
- //won't be closed. If we are using MSLAB, the chunk referenced by
those scanners
- //can't be released, thus memory leak
- snapshot.close();
- HStore.this.updateStorefiles(Collections.emptyList(),
snapshot.getId());
- }
+ public StoreFlushContext createFlushContext(long cacheFlushId,
FlushLifeCycleTracker tracker)
+ throws IOException {
+ Class<StoreFlushContext> flushContextClass = (Class<StoreFlushContext>)
+ conf.getClass(STORE_FLUSH_CONTEXT_CLASS_NAME,
DefaultStoreFlushContext.class);
+ try {
+ return flushContextClass.getConstructor(HStore.class, Long.class,
FlushLifeCycleTracker.class)
+ .newInstance(this, cacheFlushId, tracker);
Review comment:
Do you think it would be cleaner to move this into an `init()` method
rather than using reflection on the constructor?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]