http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 6edf006..86a24ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -17,6 +17,59 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; +import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; + +import java.io.EOFException; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.text.ParseException; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Optional; +import java.util.RandomAccess; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -90,6 +143,7 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.Write import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; @@ -97,7 +151,6 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Optional; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; @@ -143,58 +196,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import java.io.EOFException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.lang.reflect.Constructor; -import java.nio.ByteBuffer; -import java.text.ParseException; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.RandomAccess; -import java.util.Set; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.LongAdder; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; - -import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; -import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; - @SuppressWarnings("deprecation") @InterfaceAudience.Private public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { @@ -254,9 +255,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // - the thread that owns the lock (allow reentrancy) // - reference count of (reentrant) locks held by the thread // - the row itself - private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows = + new ConcurrentHashMap<>(); - protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); + protected final Map<byte[], HStore> stores = + new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR); // TODO: account for each registered handler in HeapSize computation private Map<String, com.google.protobuf.Service> coprocessorServiceHandlers = Maps.newHashMap(); @@ -513,7 +516,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** A result object from prepare flush cache stage */ @VisibleForTesting static class PrepareFlushResult { - final FlushResult result; // indicating a failure result from prepare + final FlushResultImpl result; // indicating a failure result from prepare final TreeMap<byte[], StoreFlushContext> storeFlushCtxs; final TreeMap<byte[], List<Path>> committedFiles; final TreeMap<byte[], MemstoreSize> storeFlushableSize; @@ -523,7 +526,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final MemstoreSize totalFlushableSize; /** Constructs an early exit case */ - PrepareFlushResult(FlushResult result, long flushSeqId) { + PrepareFlushResult(FlushResultImpl result, long flushSeqId) { this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, new MemstoreSize()); } @@ -538,7 +541,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } private PrepareFlushResult( - FlushResult result, + FlushResultImpl result, TreeMap<byte[], StoreFlushContext> storeFlushCtxs, TreeMap<byte[], List<Path>> committedFiles, TreeMap<byte[], MemstoreSize> storeFlushableSize, long startTime, long flushSeqId, @@ -616,7 +619,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final long rowProcessorTimeout; // Last flush time for each Store. Useful when we are flushing for each column - private final ConcurrentMap<Store, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>(); final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; @@ -802,7 +805,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.disallowWritesInRecovering = conf.getBoolean(HConstants.DISALLOW_WRITES_IN_RECOVERING, HConstants.DEFAULT_DISALLOW_WRITES_IN_RECOVERING_CONFIG); - configurationManager = Optional.absent(); + configurationManager = Optional.empty(); // disable stats tracking system tables, but check the config for everything else this.regionStatsEnabled = htd.getTableName().getNamespaceAsString().equals( @@ -902,22 +905,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long maxSeqId = initializeStores(reporter, status); this.mvcc.advanceTo(maxSeqId); if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { - List<Store> stores = this.getStores(); // update the stores that we are replaying + Collection<HStore> stores = this.stores.values(); try { - for (Store store : stores) { - ((HStore) store).startReplayingFromWAL(); - } + // update the stores that we are replaying + stores.forEach(HStore::startReplayingFromWAL); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, - replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); // Make sure mvcc is up to max. this.mvcc.advanceTo(maxSeqId); } finally { - for (Store store : stores) { // update the stores that we are done replaying - ((HStore)store).stopReplayingFromWAL(); - } + // update the stores that we are done replaying + stores.forEach(HStore::startReplayingFromWAL); } - } this.lastReplayedOpenRegionSeqId = maxSeqId; @@ -947,7 +947,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.flushPolicy = FlushPolicyFactory.create(this, conf); long lastFlushTime = EnvironmentEdgeManager.currentTime(); - for (Store store: stores.values()) { + for (HStore store: stores.values()) { this.lastStoreFlushTimeMap.put(store, lastFlushTime); } @@ -988,10 +988,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return Highest sequenceId found out in a Store. * @throws IOException */ - private long initializeStores(final CancelableProgressable reporter, MonitoredTask status) - throws IOException { + private long initializeStores(CancelableProgressable reporter, MonitoredTask status) + throws IOException { // Load in all the HStores. - long maxSeqId = -1; // initialized to -1 so that we pick up MemstoreTS from column families long maxMemstoreTS = -1; @@ -1050,11 +1049,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!allStoresOpened) { // something went wrong, close all opened stores LOG.error("Could not initialize all stores for the region=" + this); - for (Store store : this.stores.values()) { + for (HStore store : this.stores.values()) { try { store.close(); } catch (IOException e) { - LOG.warn(e.getMessage()); + LOG.warn("close store failed", e); } } } @@ -1079,11 +1078,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private NavigableMap<byte[], List<Path>> getStoreFiles() { NavigableMap<byte[], List<Path>> allStoreFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (Store store: getStores()) { + for (HStore store : stores.values()) { Collection<StoreFile> storeFiles = store.getStorefiles(); - if (storeFiles == null) continue; + if (storeFiles == null) { + continue; + } List<Path> storeFileNames = new ArrayList<>(); - for (StoreFile storeFile: storeFiles) { + for (StoreFile storeFile : storeFiles) { storeFileNames.add(storeFile.getPath()); } allStoreFiles.put(store.getColumnFamilyDescriptor().getName(), storeFileNames); @@ -1121,10 +1122,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return True if this region has references. */ public boolean hasReferences() { - for (Store store : this.stores.values()) { - if (store.hasReferences()) return true; - } - return false; + return stores.values().stream().anyMatch(HStore::hasReferences); } public void blockUpdates() { @@ -1137,19 +1135,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public HDFSBlocksDistribution getHDFSBlocksDistribution() { - HDFSBlocksDistribution hdfsBlocksDistribution = - new HDFSBlocksDistribution(); - synchronized (this.stores) { - for (Store store : this.stores.values()) { - Collection<StoreFile> storeFiles = store.getStorefiles(); - if (storeFiles == null) continue; - for (StoreFile sf : storeFiles) { - HDFSBlocksDistribution storeFileBlocksDistribution = - sf.getHDFSBlockDistribution(); - hdfsBlocksDistribution.add(storeFileBlocksDistribution); - } - } - } + HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); + stores.values().stream().filter(s -> s.getStorefiles() != null) + .flatMap(s -> s.getStorefiles().stream()).map(StoreFile::getHDFSBlockDistribution) + .forEachOrdered(hdfsBlocksDistribution::add); return hdfsBlocksDistribution; } @@ -1161,8 +1150,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return The HDFS blocks distribution for the given region. * @throws IOException */ - public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf, - final TableDescriptor tableDescriptor, final HRegionInfo regionInfo) throws IOException { + public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf, + TableDescriptor tableDescriptor, HRegionInfo regionInfo) throws IOException { Path tablePath = FSUtils.getTableDir(FSUtils.getRootDir(conf), tableDescriptor.getTableName()); return computeHDFSBlocksDistribution(conf, tableDescriptor, regionInfo, tablePath); } @@ -1176,9 +1165,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return The HDFS blocks distribution for the given region. * @throws IOException */ - public static HDFSBlocksDistribution computeHDFSBlocksDistribution(final Configuration conf, - final TableDescriptor tableDescriptor, final HRegionInfo regionInfo, Path tablePath) - throws IOException { + public static HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf, + TableDescriptor tableDescriptor, HRegionInfo regionInfo, Path tablePath) throws IOException { HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); FileSystem fs = tablePath.getFileSystem(conf); @@ -1407,9 +1395,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi new Throwable("LOGGING: REMOVE")); // REMOVE BELOW!!!! LOG.info("DEBUG LIST ALL FILES"); - for (Store store: this.stores.values()) { + for (HStore store : this.stores.values()) { LOG.info("store " + store.getColumnFamilyName()); - for (StoreFile sf: store.getStorefiles()) { + for (StoreFile sf : store.getStorefiles()) { LOG.info(sf.toStringDetailed()); } } @@ -1667,7 +1655,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi new ExecutorCompletionService<>(storeCloserThreadPool); // close each store in parallel - for (final Store store : stores.values()) { + for (HStore store : stores.values()) { MemstoreSize flushableSize = store.getSizeToFlush(); if (!(abort || flushableSize.getDataSize() == 0 || writestate.readOnly)) { if (getRegionServerServices() != null) { @@ -1740,11 +1728,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } private long getMemstoreHeapSize() { - long size = 0; - for (Store s : this.stores.values()) { - size += s.getSizeOfMemStore().getHeapSize(); - } - return size; + return stores.values().stream().mapToLong(s -> s.getSizeOfMemStore().getHeapSize()).sum(); } @Override @@ -1902,17 +1886,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public long getOldestHfileTs(boolean majorCompactionOnly) throws IOException { long result = Long.MAX_VALUE; - for (Store store : getStores()) { + for (HStore store : stores.values()) { Collection<StoreFile> storeFiles = store.getStorefiles(); - if (storeFiles == null) continue; + if (storeFiles == null) { + continue; + } for (StoreFile file : storeFiles) { StoreFileReader sfReader = file.getReader(); - if (sfReader == null) continue; + if (sfReader == null) { + continue; + } HFile.Reader reader = sfReader.getHFileReader(); - if (reader == null) continue; + if (reader == null) { + continue; + } if (majorCompactionOnly) { byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY); - if (val == null || !Bytes.toBoolean(val)) continue; + if (val == null || !Bytes.toBoolean(val)) { + continue; + } } result = Math.min(result, reader.getFileContext().getFileCreateTime()); } @@ -1942,20 +1934,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // These methods are meant to be called periodically by the HRegionServer for // upkeep. ////////////////////////////////////////////////////////////////////////////// - - /** @return returns size of largest HStore. */ + /** + * @return returns size of largest HStore. + */ public long getLargestHStoreSize() { - long size = 0; - for (Store h : stores.values()) { - long storeSize = h.getSize(); - if (storeSize > size) { - size = storeSize; - } - } - return size; + return stores.values().stream().mapToLong(HStore::getSize).max().orElse(0L); } - /* + /** * Do preparation for pending compaction. * @throws IOException */ @@ -1964,19 +1950,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void triggerMajorCompaction() throws IOException { - for (Store s : getStores()) { - s.triggerMajorCompaction(); - } + stores.values().forEach(HStore::triggerMajorCompaction); } @Override - public void compact(final boolean majorCompaction) throws IOException { + public void compact(boolean majorCompaction) throws IOException { if (majorCompaction) { triggerMajorCompaction(); } - for (Store s : getStores()) { - CompactionContext compaction = s.requestCompaction(); - if (compaction != null) { + for (HStore s : stores.values()) { + Optional<CompactionContext> compaction = s.requestCompaction(); + if (compaction.isPresent()) { ThroughputController controller = null; if (rsServices != null) { controller = CompactionThroughputControllerFactory.create(rsServices, conf); @@ -1984,43 +1968,41 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (controller == null) { controller = NoLimitThroughputController.INSTANCE; } - compact(compaction, s, controller, null); + compact(compaction.get(), s, controller, null); } } } /** - * This is a helper function that compact all the stores synchronously + * This is a helper function that compact all the stores synchronously. + * <p> * It is used by utilities and testing - * - * @throws IOException e */ + @VisibleForTesting public void compactStores() throws IOException { - for (Store s : getStores()) { - CompactionContext compaction = s.requestCompaction(); - if (compaction != null) { - compact(compaction, s, NoLimitThroughputController.INSTANCE, null); + for (HStore s : stores.values()) { + Optional<CompactionContext> compaction = s.requestCompaction(); + if (compaction.isPresent()) { + compact(compaction.get(), s, NoLimitThroughputController.INSTANCE, null); } } } /** - * This is a helper function that compact the given store + * This is a helper function that compact the given store. + * <p> * It is used by utilities and testing - * - * @throws IOException e */ @VisibleForTesting - void compactStore(byte[] family, ThroughputController throughputController) - throws IOException { - Store s = getStore(family); - CompactionContext compaction = s.requestCompaction(); - if (compaction != null) { - compact(compaction, s, throughputController, null); + void compactStore(byte[] family, ThroughputController throughputController) throws IOException { + HStore s = getStore(family); + Optional<CompactionContext> compaction = s.requestCompaction(); + if (compaction.isPresent()) { + compact(compaction.get(), s, throughputController, null); } } - /* + /** * Called by compaction thread and after region is opened to compact the * HStores if necessary. * @@ -2035,12 +2017,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param throughputController * @return whether the compaction completed */ - public boolean compact(CompactionContext compaction, Store store, + public boolean compact(CompactionContext compaction, HStore store, ThroughputController throughputController) throws IOException { return compact(compaction, store, throughputController, null); } - public boolean compact(CompactionContext compaction, Store store, + public boolean compact(CompactionContext compaction, HStore store, ThroughputController throughputController, User user) throws IOException { assert compaction != null && compaction.hasSelection(); assert !compaction.getRequest().getFiles().isEmpty(); @@ -2214,7 +2196,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * because a Snapshot was not properly persisted. The region is put in closing mode, and the * caller MUST abort after this. */ - public FlushResult flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker) + public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker) throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { @@ -2261,10 +2243,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } try { - Collection<Store> specificStoresToFlush = + Collection<HStore> specificStoresToFlush = forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); - FlushResult fs = internalFlushcache(specificStoresToFlush, - status, writeFlushRequestWalMarker); + FlushResultImpl fs = + internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); @@ -2297,7 +2279,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * that you always flush all stores). Otherwise the method will always * returns true which will make a lot of flush requests. */ - boolean shouldFlushStore(Store store) { + boolean shouldFlushStore(HStore store) { long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store.getColumnFamilyDescriptor().getName()) - 1; if (earliest > 0 && earliest + flushPerChanges < mvcc.getReadPoint()) { @@ -2349,7 +2331,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } //since we didn't flush in the recent past, flush now if certain conditions //are met. Return true on first such memstore hit. - for (Store s : getStores()) { + for (Store s : stores.values()) { if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) { // we have an old enough edit in the memstore, flush whyFlush.append(s.toString() + " has an old edit so flush to free WALs"); @@ -2361,39 +2343,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Flushing all stores. - * * @see #internalFlushcache(Collection, MonitoredTask, boolean) */ - private FlushResult internalFlushcache(MonitoredTask status) - throws IOException { + private FlushResult internalFlushcache(MonitoredTask status) throws IOException { return internalFlushcache(stores.values(), status, false); } /** * Flushing given stores. - * * @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean) */ - private FlushResult internalFlushcache(final Collection<Store> storesToFlush, - MonitoredTask status, boolean writeFlushWalMarker) throws IOException { - return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, - status, writeFlushWalMarker); + private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, MonitoredTask status, + boolean writeFlushWalMarker) throws IOException { + return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, status, + writeFlushWalMarker); } /** - * Flush the memstore. Flushing the memstore is a little tricky. We have a lot - * of updates in the memstore, all of which have also been written to the wal. - * We need to write those updates in the memstore out to disk, while being - * able to process reads/writes as much as possible during the flush - * operation. + * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the + * memstore, all of which have also been written to the wal. We need to write those updates in the + * memstore out to disk, while being able to process reads/writes as much as possible during the + * flush operation. * <p> - * This method may block for some time. Every time you call it, we up the - * regions sequence id even if we don't flush; i.e. the returned region id - * will be at least one larger than the last edit applied to this region. The - * returned id does not refer to an actual edit. The returned id can be used - * for say installing a bulk loaded file just ahead of the last hfile that was - * the result of this flush, etc. - * + * This method may block for some time. Every time you call it, we up the regions sequence id even + * if we don't flush; i.e. the returned region id will be at least one larger than the last edit + * applied to this region. The returned id does not refer to an actual edit. The returned id can + * be used for say installing a bulk loaded file just ahead of the last hfile that was the result + * of this flush, etc. * @param wal Null if we're NOT to go via wal. * @param myseqid The seqid to use if <code>wal</code> is null writing out flush file. * @param storesToFlush The list of stores to flush. @@ -2401,9 +2377,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of WAL is required. */ - protected FlushResult internalFlushcache(final WAL wal, final long myseqid, - final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker) - throws IOException { + protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, Collection<HStore> storesToFlush, + MonitoredTask status, boolean writeFlushWalMarker) throws IOException { PrepareFlushResult result = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker); if (result.result == null) { @@ -2415,9 +2390,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE", justification="FindBugs seems confused about trxId") - protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid, - final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker) - throws IOException { + protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, + Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker) + throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { // Don't flush when server aborting, it's unsafe throw new IOException("Aborting flush because server is aborted..."); @@ -2439,11 +2414,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for // sure just beyond the last appended region edit and not associated with any edit // (useful as marker when bulk loading, etc.). - FlushResult flushResult = null; if (wal != null) { writeEntry = mvcc.begin(); long flushOpSeqId = writeEntry.getWriteNumber(); - flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, + FlushResultImpl flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, "Nothing to flush", writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); mvcc.completeAndWait(writeEntry); @@ -2479,9 +2453,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MemstoreSize totalSizeOfFlushableStores = new MemstoreSize(); Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>(); - for (Store store: storesToFlush) { + for (HStore store : storesToFlush) { flushedFamilyNamesToSeq.put(store.getColumnFamilyDescriptor().getName(), - ((HStore) store).preFlushSeqIDEstimation()); + store.preFlushSeqIDEstimation()); } TreeMap<byte[], StoreFlushContext> storeFlushCtxs = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -2517,7 +2491,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi flushedSeqId = flushOpSeqId = myseqid; } - for (Store s : storesToFlush) { + for (HStore s : storesToFlush) { MemstoreSize flushableSize = s.getSizeToFlush(); totalSizeOfFlushableStores.incMemstoreSize(flushableSize); storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), s.createFlushContext(flushOpSeqId)); @@ -2555,7 +2529,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Utility method broken out of internalPrepareFlushCache so that method is smaller. */ - private void logFatLineOnFlush(final Collection<Store> storesToFlush, final long sequenceId) { + private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId) { if (!LOG.isInfoEnabled()) { return; } @@ -2563,7 +2537,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi StringBuilder perCfExtras = null; if (!isAllFamilies(storesToFlush)) { perCfExtras = new StringBuilder(); - for (Store store: storesToFlush) { + for (HStore store: storesToFlush) { perCfExtras.append("; ").append(store.getColumnFamilyName()); perCfExtras.append("=") .append(StringUtils.byteDesc(store.getSizeToFlush().getDataSize())); @@ -2611,7 +2585,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * @return True if passed Set is all families in the region. */ - private boolean isAllFamilies(final Collection<Store> families) { + private boolean isAllFamilies(Collection<HStore> families) { return families == null || this.stores.size() == families.size(); } @@ -2639,11 +2613,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", justification="Intentional; notify is about completed flush") - protected FlushResult internalFlushCacheAndCommit( - final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult, - final Collection<Store> storesToFlush) - throws IOException { - + protected FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status, + PrepareFlushResult prepareResult, Collection<HStore> storesToFlush) throws IOException { // prepare flush context is carried via PrepareFlushResult TreeMap<byte[], StoreFlushContext> storeFlushCtxs = prepareResult.storeFlushCtxs; TreeMap<byte[], List<Path>> committedFiles = prepareResult.committedFiles; @@ -2673,7 +2644,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). - Iterator<Store> it = storesToFlush.iterator(); + Iterator<HStore> it = storesToFlush.iterator(); // stores.values() and storeFlushCtxs have same order for (StoreFlushContext flush : storeFlushCtxs.values()) { boolean needsCompaction = flush.commit(status); @@ -2746,7 +2717,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Record latest flush time - for (Store store: storesToFlush) { + for (HStore store: storesToFlush) { this.lastStoreFlushTimeMap.put(store, startTime); } @@ -4002,34 +3973,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - /* + /** * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be - * set; when set we will run operations that make sense in the increment/append scenario but - * that do not make sense otherwise. - * @see #applyToMemstore(Store, Cell, long) + * set; when set we will run operations that make sense in the increment/append scenario + * but that do not make sense otherwise. + * @see #applyToMemstore(HStore, Cell, long) */ - private void applyToMemstore(final Store store, final List<Cell> cells, final boolean delta, + private void applyToMemstore(HStore store, List<Cell> cells, boolean delta, MemstoreSize memstoreSize) throws IOException { // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!! boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1; if (upsert) { - ((HStore) store).upsert(cells, getSmallestReadPoint(), memstoreSize); + store.upsert(cells, getSmallestReadPoint(), memstoreSize); } else { - ((HStore) store).add(cells, memstoreSize); + store.add(cells, memstoreSize); } } - /* - * @see #applyToMemstore(Store, List, boolean, boolean, long) + /** + * @see #applyToMemstore(HStore, List, boolean, boolean, long) */ - private void applyToMemstore(final Store store, final Cell cell, MemstoreSize memstoreSize) - throws IOException { + private void applyToMemstore(HStore store, Cell cell, MemstoreSize memstoreSize) + throws IOException { // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!! if (store == null) { checkFamily(CellUtil.cloneFamily(cell)); // Unreachable because checkFamily will throw exception } - ((HStore) store).add(cell, memstoreSize); + store.add(cell, memstoreSize); } @Override @@ -4368,7 +4339,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Figure which store the edit is meant for. if (store == null || !CellUtil.matchingFamily(cell, store.getColumnFamilyDescriptor().getName())) { - store = getHStore(cell); + store = getStore(cell); } if (store == null) { // This should never happen. Perhaps schema was changed between @@ -4497,7 +4468,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(Operation.REPLAY_EVENT); try { - HStore store = this.getHStore(compaction.getFamilyName().toByteArray()); + HStore store = this.getStore(compaction.getFamilyName().toByteArray()); if (store == null) { LOG.warn(getRegionInfo().getEncodedName() + " : " + "Found Compaction WAL edit for deleted family:" @@ -4567,10 +4538,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException { long flushSeqId = flush.getFlushSequenceNumber(); - HashSet<Store> storesToFlush = new HashSet<>(); + HashSet<HStore> storesToFlush = new HashSet<>(); for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) { byte[] family = storeFlush.getFamilyName().toByteArray(); - Store store = getStore(family); + HStore store = getStore(family); if (store == null) { LOG.warn(getRegionInfo().getEncodedName() + " : " + "Received a flush start marker from primary, but the family is not found. Ignoring" @@ -4807,7 +4778,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throws IOException { for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) { byte[] family = storeFlush.getFamilyName().toByteArray(); - Store store = getStore(family); + HStore store = getStore(family); if (store == null) { LOG.warn(getRegionInfo().getEncodedName() + " : " + "Received a flush commit marker from primary, but the family is not found." @@ -4843,7 +4814,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * if the memstore edits have seqNums smaller than the given seq id * @throws IOException */ - private MemstoreSize dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException { + private MemstoreSize dropMemstoreContentsForSeqId(long seqId, HStore store) throws IOException { MemstoreSize totalFreedSize = new MemstoreSize(); this.updatesLock.writeLock().lock(); try { @@ -4857,7 +4828,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Prepare flush (take a snapshot) and then abort (drop the snapshot) if (store == null) { - for (Store s : stores.values()) { + for (HStore s : stores.values()) { totalFreedSize.incMemstoreSize(doDropStoreMemstoreContentsForSeqId(s, currentSeqId)); } } else { @@ -4874,7 +4845,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return totalFreedSize; } - private MemstoreSize doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) + private MemstoreSize doDropStoreMemstoreContentsForSeqId(HStore s, long currentSeqId) throws IOException { MemstoreSize flushableSize = s.getSizeToFlush(); this.decrMemstoreSize(flushableSize); @@ -4965,7 +4936,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (StoreDescriptor storeDescriptor : regionEvent.getStoresList()) { // stores of primary may be different now byte[] family = storeDescriptor.getFamilyName().toByteArray(); - Store store = getStore(family); + HStore store = getStore(family); if (store == null) { LOG.warn(getRegionInfo().getEncodedName() + " : " + "Received a region open marker from primary, but the family is not found. " @@ -5081,7 +5052,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) { // stores of primary may be different now family = storeDescriptor.getFamilyName().toByteArray(); - HStore store = getHStore(family); + HStore store = getStore(family); if (store == null) { LOG.warn(getRegionInfo().getEncodedName() + " : " + "Received a bulk load marker from primary, but the family is not found. " @@ -5119,9 +5090,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (writestate.flushing) { boolean canDrop = true; if (prepareFlushResult.storeFlushCtxs != null) { - for (Entry<byte[], StoreFlushContext> entry - : prepareFlushResult.storeFlushCtxs.entrySet()) { - Store store = getStore(entry.getKey()); + for (Entry<byte[], StoreFlushContext> entry : prepareFlushResult.storeFlushCtxs + .entrySet()) { + HStore store = getStore(entry.getKey()); if (store == null) { continue; } @@ -5164,9 +5135,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(); // obtain region close lock try { - Map<Store, Long> map = new HashMap<>(); + Map<HStore, Long> map = new HashMap<>(); synchronized (writestate) { - for (Store store : getStores()) { + for (HStore store : stores.values()) { // TODO: some stores might see new data from flush, while others do not which // MIGHT break atomic edits across column families. long maxSeqIdBefore = store.getMaxSequenceId(); @@ -5207,10 +5178,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi dropPrepareFlushIfPossible(); // advance the mvcc read point so that the new flushed files are visible. - // either greater than flush seq number or they were already picked up via flush. - for (Store s : getStores()) { - mvcc.advanceTo(s.getMaxMemstoreTS()); - } + // either greater than flush seq number or they were already picked up via flush. + for (HStore s : stores.values()) { + mvcc.advanceTo(s.getMaxMemstoreTS()); + } // smallestSeqIdInStores is the seqId that we have a corresponding hfile for. We can safely @@ -5222,7 +5193,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } if (!map.isEmpty()) { - for (Map.Entry<Store, Long> entry : map.entrySet()) { + for (Map.Entry<HStore, Long> entry : map.entrySet()) { // Drop the memstore contents if they are now smaller than the latest seen flushed file totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()) .getDataSize(); @@ -5242,13 +5213,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private void logRegionFiles() { if (LOG.isTraceEnabled()) { LOG.trace(getRegionInfo().getEncodedName() + " : Store files for region: "); - for (Store s : stores.values()) { - Collection<StoreFile> storeFiles = s.getStorefiles(); - if (storeFiles == null) continue; - for (StoreFile sf : storeFiles) { - LOG.trace(getRegionInfo().getEncodedName() + " : " + sf); - } - } + stores.values().stream().filter(s -> s.getStorefiles() != null) + .flatMap(s -> s.getStorefiles().stream()) + .forEachOrdered(sf -> LOG.trace(getRegionInfo().getEncodedName() + " : " + sf)); } } @@ -5272,17 +5239,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + " does not match this region: " + this.getRegionInfo()); } - /* + /** * Used by tests * @param s Store to add edit too. * @param cell Cell to add. * @param memstoreSize */ - protected void restoreEdit(final HStore s, final Cell cell, MemstoreSize memstoreSize) { + @VisibleForTesting + protected void restoreEdit(HStore s, Cell cell, MemstoreSize memstoreSize) { s.add(cell, memstoreSize); } - /* + /** * @param fs * @param p File to check. * @return True if file was zero-length (and if so, we'll delete it in here). @@ -5291,7 +5259,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p) throws IOException { FileStatus stat = fs.getFileStatus(p); - if (stat.getLen() > 0) return false; + if (stat.getLen() > 0) { + return false; + } LOG.warn("File " + p + " is zero-length, deleting."); fs.delete(p, false); return true; @@ -5311,49 +5281,39 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public Store getStore(final byte[] column) { - return getHStore(column); - } - - public HStore getHStore(final byte[] column) { - return (HStore) this.stores.get(column); + public HStore getStore(byte[] column) { + return this.stores.get(column); } /** - * Return HStore instance. Does not do any copy: as the number of store is limited, we - * iterate on the list. + * Return HStore instance. Does not do any copy: as the number of store is limited, we iterate on + * the list. */ - private HStore getHStore(Cell cell) { - for (Map.Entry<byte[], Store> famStore : stores.entrySet()) { - if (CellUtil.matchingFamily(cell, famStore.getKey(), 0, famStore.getKey().length)) { - return (HStore) famStore.getValue(); - } - } - - return null; + private HStore getStore(Cell cell) { + return stores.entrySet().stream().filter(e -> CellUtil.matchingFamily(cell, e.getKey())) + .map(e -> e.getValue()).findFirst().orElse(null); } @Override - public List<Store> getStores() { - List<Store> list = new ArrayList<>(stores.size()); - list.addAll(stores.values()); - return list; + public List<HStore> getStores() { + return new ArrayList<>(stores.values()); } @Override - public List<String> getStoreFileList(final byte [][] columns) - throws IllegalArgumentException { + public List<String> getStoreFileList(byte[][] columns) throws IllegalArgumentException { List<String> storeFileNames = new ArrayList<>(); - synchronized(closeLock) { - for(byte[] column : columns) { - Store store = this.stores.get(column); + synchronized (closeLock) { + for (byte[] column : columns) { + HStore store = this.stores.get(column); if (store == null) { - throw new IllegalArgumentException("No column family : " + - new String(column) + " available"); + throw new IllegalArgumentException( + "No column family : " + new String(column) + " available"); } Collection<StoreFile> storeFiles = store.getStorefiles(); - if (storeFiles == null) continue; - for (StoreFile storeFile: storeFiles) { + if (storeFiles == null) { + continue; + } + for (StoreFile storeFile : storeFiles) { storeFileNames.add(storeFile.getPath().toString()); } @@ -5368,7 +5328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi ////////////////////////////////////////////////////////////////////////////// /** Make sure this is a valid row for the HRegion */ - void checkRow(final byte [] row, String op) throws IOException { + void checkRow(byte[] row, String op) throws IOException { if (!rowIsInRange(getRegionInfo(), row)) { throw new WrongRegionException("Requested row out of range for " + op + " on HRegion " + this + ", startKey='" + @@ -5637,7 +5597,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] familyName = p.getFirst(); String path = p.getSecond(); - HStore store = getHStore(familyName); + HStore store = getStore(familyName); if (store == null) { IOException ioe = new org.apache.hadoop.hbase.DoNotRetryIOException( "No such column family " + Bytes.toStringBinary(familyName)); @@ -5697,7 +5657,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (Pair<byte[], String> p : familyPaths) { byte[] familyName = p.getFirst(); String path = p.getSecond(); - HStore store = getHStore(familyName); + HStore store = getStore(familyName); if (!familyWithFinalPath.containsKey(familyName)) { familyWithFinalPath.put(familyName, new ArrayList<>()); } @@ -5737,7 +5697,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi for (Pair<Path, Path> p : entry.getValue()) { String path = p.getFirst().toString(); Path commitedStoreFile = p.getSecond(); - HStore store = getHStore(familyName); + HStore store = getStore(familyName); try { store.bulkLoadHFile(familyName, path, commitedStoreFile); // Note the size of the store file @@ -5912,7 +5872,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { - Store store = stores.get(entry.getKey()); + HStore store = stores.get(entry.getKey()); KeyValueScanner scanner; try { scanner = store.getScanner(scan, entry.getValue(), this.readPt); @@ -7145,7 +7105,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If no WAL, need to stamp it here. CellUtil.setSequenceId(cell, sequenceId); } - applyToMemstore(getHStore(cell), cell, memstoreSize); + applyToMemstore(getStore(cell), cell, memstoreSize); } } @@ -7296,7 +7256,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return returnResults? cpResult: null; } Durability effectiveDurability = getEffectiveDurability(mutation.getDurability()); - Map<Store, List<Cell>> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size()); + Map<HStore, List<Cell>> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size()); // Reckon Cells to apply to WAL -- in returned walEdit -- and what to add to memstore and // what to return back to the client (in 'forMemStore' and 'results' respectively). WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results); @@ -7311,7 +7271,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber()); } // Now write to MemStore. Do it a column family at a time. - for (Map.Entry<Store, List<Cell>> e : forMemStore.entrySet()) { + for (Map.Entry<HStore, List<Cell>> e : forMemStore.entrySet()) { applyToMemstore(e.getKey(), e.getValue(), true, memstoreSize); } mvcc.completeAndWait(writeEntry); @@ -7419,18 +7379,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param forMemStore Fill in here what to apply to the MemStore (by Store). * @return A WALEdit to apply to WAL or null if we are to skip the WAL. */ - private WALEdit reckonDeltas(final Operation op, final Mutation mutation, - final Durability effectiveDurability, final Map<Store, List<Cell>> forMemStore, - final List<Cell> results) - throws IOException { + private WALEdit reckonDeltas(Operation op, Mutation mutation, Durability effectiveDurability, + Map<HStore, List<Cell>> forMemStore, List<Cell> results) throws IOException { WALEdit walEdit = null; long now = EnvironmentEdgeManager.currentTime(); final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; // Process a Store/family at a time. for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) { - final byte [] columnFamilyName = entry.getKey(); + final byte[] columnFamilyName = entry.getKey(); List<Cell> deltas = entry.getValue(); - Store store = this.stores.get(columnFamilyName); + HStore store = this.stores.get(columnFamilyName); // Reckon for the Store what to apply to WAL and MemStore. List<Cell> toApply = reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results); @@ -7462,11 +7420,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return Resulting Cells after <code>deltas</code> have been applied to current * values. Side effect is our filling out of the <code>results</code> List. */ - private List<Cell> reckonDeltasByStore(final Store store, final Operation op, - final Mutation mutation, final Durability effectiveDurability, final long now, - final List<Cell> deltas, final List<Cell> results) - throws IOException { - byte [] columnFamily = store.getColumnFamilyDescriptor().getName(); + private List<Cell> reckonDeltasByStore(HStore store, Operation op, Mutation mutation, + Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results) + throws IOException { + byte[] columnFamily = store.getColumnFamilyDescriptor().getName(); List<Cell> toApply = new ArrayList<>(deltas.size()); // Get previous values for all columns in this family. List<Cell> currentValues = get(mutation, store, deltas, @@ -7576,9 +7533,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get. * @return Return list of Cells found. */ - private List<Cell> get(final Mutation mutation, final Store store, - final List<Cell> coordinates, final IsolationLevel isolation, final TimeRange tr) - throws IOException { + private List<Cell> get(Mutation mutation, HStore store, List<Cell> coordinates, + IsolationLevel isolation, TimeRange tr) throws IOException { // Sort the cells so that they match the order that they appear in the Get results. Otherwise, // we won't be able to find the existing values if the cells are not specified in order by the // client since cells are in an array list. @@ -7653,12 +7609,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public long heapSize() { - long heapSize = DEEP_OVERHEAD; - for (Store store : this.stores.values()) { - heapSize += store.heapSize(); - } // this does not take into account row locks, recent flushes, mvcc entries, and more - return heapSize; + return DEEP_OVERHEAD + stores.values().stream().mapToLong(HStore::heapSize).sum(); } @Override @@ -7813,14 +7765,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return The priority that this region should have in the compaction queue */ public int getCompactPriority() { - int count = Integer.MAX_VALUE; - for (Store store : stores.values()) { - count = Math.min(count, store.getCompactPriority()); - } - return count; + return stores.values().stream().mapToInt(HStore::getCompactPriority).min() + .orElse(Store.NO_PRIORITY); } - /** @return the coprocessor host */ @Override public RegionCoprocessorHost getCoprocessorHost() { @@ -7881,11 +7829,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // The unit for snapshot is a region. So, all stores for this region must be // prepared for snapshot operation before proceeding. if (op == Operation.SNAPSHOT) { - for (Store store : stores.values()) { - if (store instanceof HStore) { - ((HStore)store).preSnapshotOperation(); - } - } + stores.values().forEach(HStore::preSnapshotOperation); } try { if (coprocessorHost != null) { @@ -7905,11 +7849,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void closeRegionOperation(Operation operation) throws IOException { if (operation == Operation.SNAPSHOT) { - for (Store store: stores.values()) { - if (store instanceof HStore) { - ((HStore)store).postSnapshotOperation(); - } - } + stores.values().forEach(HStore::postSnapshotOperation); } lock.readLock().unlock(); if (coprocessorHost != null) { @@ -8142,9 +8082,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void registerChildren(ConfigurationManager manager) { configurationManager = Optional.of(manager); - for (Store s : this.stores.values()) { - configurationManager.get().registerObserver(s); - } + stores.values().forEach(manager::registerObserver); } /** @@ -8152,9 +8090,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @Override public void deregisterChildren(ConfigurationManager manager) { - for (Store s : this.stores.values()) { - configurationManager.get().deregisterObserver(s); - } + stores.values().forEach(configurationManager.get()::deregisterObserver); } @Override @@ -8175,7 +8111,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi buf.append(getRegionInfo().isMetaRegion() ? " meta region " : " "); buf.append(getRegionInfo().isMetaTable() ? " meta table " : " "); buf.append("stores: "); - for (Store s : getStores()) { + for (HStore s : stores.values()) { buf.append(s.getColumnFamilyDescriptor().getNameAsString()); buf.append(" size: "); buf.append(s.getSizeOfMemStore().getDataSize()); @@ -8188,4 +8124,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new RuntimeException(buf.toString()); } } + + @Override + public void requestCompaction(String why, int priority, CompactionLifeCycleTracker tracker, + User user) throws IOException { + ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this, why, priority, tracker, + user); + } + + @Override + public void requestCompaction(byte[] family, String why, int priority, + CompactionLifeCycleTracker tracker, User user) throws IOException { + ((HRegionServer) rsServices).compactSplitThread.requestCompaction(this, + Preconditions.checkNotNull(stores.get(family)), why, priority, tracker, user); + } }
http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6bbff36..62987c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -56,8 +56,8 @@ import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.servlet.http.HttpServlet; -import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.SystemUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -127,6 +127,7 @@ import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; @@ -140,6 +141,9 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; @@ -210,10 +214,6 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; - import sun.misc.Signal; import sun.misc.SignalHandler; @@ -1686,7 +1686,7 @@ public class HRegionServer extends HasThread implements int totalStaticBloomSizeKB = 0; long totalCompactingKVs = 0; long currentCompactedKVs = 0; - List<Store> storeList = r.getStores(); + List<? extends Store> storeList = r.getStores(); stores += storeList.size(); for (Store store : storeList) { storefiles += store.getStorefilesCount(); @@ -1772,27 +1772,32 @@ public class HRegionServer extends HasThread implements @Override protected void chore() { for (Region r : this.instance.onlineRegions.values()) { - if (r == null) + if (r == null) { continue; - for (Store s : r.getStores()) { + } + HRegion hr = (HRegion) r; + for (HStore s : hr.stores.values()) { try { long multiplier = s.getCompactionCheckMultiplier(); assert multiplier > 0; - if (iteration % multiplier != 0) continue; + if (iteration % multiplier != 0) { + continue; + } if (s.needsCompaction()) { // Queue a compaction. Will recognize if major is needed. - this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() - + " requests compaction"); + this.instance.compactSplitThread.requestSystemCompaction(hr, s, + getName() + " requests compaction"); } else if (s.isMajorCompaction()) { s.triggerMajorCompaction(); - if (majorCompactPriority == DEFAULT_PRIORITY - || majorCompactPriority > ((HRegion)r).getCompactPriority()) { - this.instance.compactSplitThread.requestCompaction(r, s, getName() - + " requests major compaction; use default priority", null); + if (majorCompactPriority == DEFAULT_PRIORITY || + majorCompactPriority > hr.getCompactPriority()) { + this.instance.compactSplitThread.requestCompaction(hr, s, + getName() + " requests major compaction; use default priority", Store.NO_PRIORITY, + CompactionLifeCycleTracker.DUMMY, null); } else { - this.instance.compactSplitThread.requestCompaction(r, s, getName() - + " requests major compaction; use configured priority", - this.majorCompactPriority, null, null); + this.instance.compactSplitThread.requestCompaction(hr, s, + getName() + " requests major compaction; use configured priority", + this.majorCompactPriority, CompactionLifeCycleTracker.DUMMY, null); } } } catch (IOException e) { @@ -2146,15 +2151,14 @@ public class HRegionServer extends HasThread implements @Override public void postOpenDeployTasks(final PostOpenDeployContext context) throws KeeperException, IOException { - Region r = context.getRegion(); + HRegion r = (HRegion) context.getRegion(); long masterSystemTime = context.getMasterSystemTime(); - Preconditions.checkArgument(r instanceof HRegion, "r must be an HRegion"); rpcServices.checkOpen(); LOG.info("Post open deploy tasks for " + r.getRegionInfo().getRegionNameAsString()); // Do checks to see if we need to compact (references or too many files) - for (Store s : r.getStores()) { + for (HStore s : r.stores.values()) { if (s.hasReferences() || s.needsCompaction()) { - this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); + this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region"); } } long openSeqNum = r.getOpenSeqNum(); @@ -2863,11 +2867,6 @@ public class HRegionServer extends HasThread implements return serverName; } - @Override - public CompactionRequestor getCompactionRequester() { - return this.compactSplitThread; - } - public RegionServerCoprocessorHost getRegionServerCoprocessorHost(){ return this.rsHost; } http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index f011c18..daad241 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -52,13 +53,12 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompoundConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.FailedArchiveException; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.io.compress.Compression; @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; @@ -82,8 +83,6 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; @@ -92,14 +91,16 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; +import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableCollection; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; /** * A Store holds a column family in a Region. Its a memstore and a set of zero @@ -477,7 +478,7 @@ public class HStore implements Store { /** * @param tabledir {@link Path} to where the table is being stored * @param hri {@link HRegionInfo} for the region. - * @param family {@link HColumnDescriptor} describing the column family + * @param family {@link ColumnFamilyDescriptor} describing the column family * @return Path to family/Store home directory. */ @Deprecated @@ -489,7 +490,7 @@ public class HStore implements Store { /** * @param tabledir {@link Path} to where the table is being stored * @param encodedName Encoded region name. - * @param family {@link HColumnDescriptor} describing the column family + * @param family {@link ColumnFamilyDescriptor} describing the column family * @return Path to family/Store home directory. */ @Deprecated @@ -1386,15 +1387,14 @@ public class HStore implements Store { } } - private List<StoreFile> moveCompatedFilesIntoPlace( - final CompactionRequest cr, List<Path> newFiles, User user) throws IOException { + private List<StoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles, + User user) throws IOException { List<StoreFile> sfs = new ArrayList<>(newFiles.size()); for (Path newFile : newFiles) { assert newFile != null; - final StoreFile sf = moveFileIntoPlace(newFile); + StoreFile sf = moveFileIntoPlace(newFile); if (this.getCoprocessorHost() != null) { - final Store thisStore = this; - getCoprocessorHost().postCompact(thisStore, sf, cr, user); + getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user); } assert sf != null; sfs.add(sf); @@ -1636,23 +1636,12 @@ public class HStore implements Store { } @Override - public CompactionContext requestCompaction() throws IOException { - return requestCompaction(Store.NO_PRIORITY, null); - } - - @Override - public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest) - throws IOException { - return requestCompaction(priority, baseRequest, null); - } - @Override - public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest, - User user) throws IOException { + public Optional<CompactionContext> requestCompaction(int priority, + CompactionLifeCycleTracker tracker, User user) throws IOException { // don't even select for compaction if writes are disabled if (!this.areWritesEnabled()) { - return null; + return Optional.empty(); } - // Before we do compaction, try to get rid of unneeded files to simplify things. removeUnneededFiles(); @@ -1666,7 +1655,7 @@ public class HStore implements Store { final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); boolean override = false; override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, - baseRequest, user); + tracker, user); if (override) { // Coprocessor is overriding normal file selection. compaction.forceSelect(new CompactionRequest(candidatesForCoproc)); @@ -1695,21 +1684,13 @@ public class HStore implements Store { } if (this.getCoprocessorHost() != null) { this.getCoprocessorHost().postCompactSelection( - this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest, user); - } - - // Selected files; see if we have a compaction with some custom base request. - if (baseRequest != null) { - // Update the request with what the system thinks the request should be; - // its up to the request if it wants to listen. - compaction.forceSelect( - baseRequest.combineWith(compaction.getRequest())); + this, ImmutableList.copyOf(compaction.getRequest().getFiles()), tracker, user); } // Finally, we have the resulting files list. Check if we have any files at all. request = compaction.getRequest(); - final Collection<StoreFile> selectedFiles = request.getFiles(); + Collection<StoreFile> selectedFiles = request.getFiles(); if (selectedFiles.isEmpty()) { - return null; + return Optional.empty(); } addToCompactingFiles(selectedFiles); @@ -1721,6 +1702,7 @@ public class HStore implements Store { // Set priority, either override value supplied by caller or from store. request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority()); request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName()); + request.setTracker(tracker); } } finally { this.lock.readLock().unlock(); @@ -1730,7 +1712,7 @@ public class HStore implements Store { + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" + (request.isAllFiles() ? " (all files)" : "")); this.region.reportCompactionRequestStart(request.isMajor()); - return compaction; + return Optional.of(compaction); } /** Adds the files to compacting files. filesCompacting must be locked. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 020142d..8fa686c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -20,11 +20,8 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.util.StringUtils.humanReadableInt; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; - import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; -import java.lang.management.MemoryType; import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -50,6 +47,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; @@ -448,8 +446,8 @@ class MemStoreFlusher implements FlushRequester { "store files; delaying flush up to " + this.blockingWaitTime + "ms"); if (!this.server.compactSplitThread.requestSplit(region)) { try { - this.server.compactSplitThread.requestSystemCompaction( - region, Thread.currentThread().getName()); + this.server.compactSplitThread.requestSystemCompaction((HRegion) region, + Thread.currentThread().getName()); } catch (IOException e) { e = e instanceof RemoteException ? ((RemoteException)e).unwrapRemoteException() : e; @@ -503,8 +501,8 @@ class MemStoreFlusher implements FlushRequester { if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { - server.compactSplitThread.requestSystemCompaction( - region, Thread.currentThread().getName()); + server.compactSplitThread.requestSystemCompaction((HRegion) region, + Thread.currentThread().getName()); } } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java index 2611f69..e30ed8e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java @@ -761,7 +761,7 @@ class MetricsRegionServerWrapperImpl tempCheckAndMutateChecksFailed += r.getCheckAndMutateChecksFailed(); tempCheckAndMutateChecksPassed += r.getCheckAndMutateChecksPassed(); tempBlockedRequestsCount += r.getBlockedRequestsCount(); - List<Store> storeList = r.getStores(); + List<? extends Store> storeList = r.getStores(); tempNumStores += storeList.size(); for (Store store : storeList) { tempNumStoreFiles += store.getStorefilesCount(); http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java index 667b46c..dc7d3cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java @@ -95,7 +95,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable @Override public long getNumStores() { - Map<byte[],Store> stores = this.region.stores; + Map<byte[], HStore> stores = this.region.stores; if (stores == null) { return 0; } http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 02662c4..61c725b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.regionserver.Leases.Lease; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; @@ -1538,7 +1539,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - Region region = getRegion(request.getRegion()); + HRegion region = (HRegion) getRegion(request.getRegion()); // Quota support is enabled, the requesting user is not system/super user // and a quota policy is enforced that disables compactions. if (QuotaUtil.isQuotaEnabled(getConfiguration()) && @@ -1552,7 +1553,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, LOG.info("Compacting " + region.getRegionInfo().getRegionNameAsString()); boolean major = false; byte [] family = null; - Store store = null; + HStore store = null; if (request.hasFamily()) { family = request.getFamily().toByteArray(); store = region.getStore(family); @@ -1579,12 +1580,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, + region.getRegionInfo().getRegionNameAsString() + familyLogMsg); } String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg; - if(family != null) { - regionServer.compactSplitThread.requestCompaction(region, store, log, - Store.PRIORITY_USER, null, RpcServer.getRequestUser()); + if (family != null) { + regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER, + CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser()); } else { - regionServer.compactSplitThread.requestCompaction(region, log, - Store.PRIORITY_USER, null, RpcServer.getRequestUser()); + regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER, + CompactionLifeCycleTracker.DUMMY, RpcServer.getRequestUser()); } return CompactRegionResponse.newBuilder().build(); } catch (IOException ie) { @@ -1606,7 +1607,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, try { checkOpen(); requestCount.increment(); - Region region = getRegion(request.getRegion()); + HRegion region = (HRegion) getRegion(request.getRegion()); LOG.info("Flushing " + region.getRegionInfo().getRegionNameAsString()); boolean shouldFlush = true; if (request.hasIfOlderThanTs()) { @@ -1617,8 +1618,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; // Go behind the curtain so we can manage writing of the flush WAL marker - HRegion.FlushResultImpl flushResult = (HRegion.FlushResultImpl) - ((HRegion)region).flushcache(true, writeFlushWalMarker); + HRegion.FlushResultImpl flushResult = region.flushcache(true, writeFlushWalMarker); boolean compactionNeeded = flushResult.isCompactionNeeded(); if (compactionNeeded) { regionServer.compactSplitThread.requestSystemCompaction(region,
