http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index 95945c6..e1f31bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -19,13 +19,12 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,12 +32,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.KeyValue; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableCollection; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; /** * Default implementation of StoreFileManager. Not thread-safe. @@ -47,27 +46,27 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; class DefaultStoreFileManager implements StoreFileManager { private static final Log LOG = LogFactory.getLog(DefaultStoreFileManager.class); - private final CellComparator kvComparator; + private final CellComparator cellComparator; private final CompactionConfiguration comConf; private final int blockingFileCount; - private final Comparator<StoreFile> storeFileComparator; + private final Comparator<HStoreFile> storeFileComparator; /** * List of store files inside this store. This is an immutable list that * is atomically replaced when its contents change. */ - private volatile ImmutableList<StoreFile> storefiles = null; + private volatile ImmutableList<HStoreFile> storefiles = ImmutableList.of(); /** * List of compacted files inside this store that needs to be excluded in reads * because further new reads will be using only the newly created files out of compaction. * These compacted files will be deleted/cleared once all the existing readers on these * compacted files are done. */ - private volatile List<StoreFile> compactedfiles = null; + private volatile ImmutableList<HStoreFile> compactedfiles = ImmutableList.of(); - public DefaultStoreFileManager(CellComparator kvComparator, - Comparator<StoreFile> storeFileComparator, Configuration conf, + public DefaultStoreFileManager(CellComparator cellComparator, + Comparator<HStoreFile> storeFileComparator, Configuration conf, CompactionConfiguration comConf) { - this.kvComparator = kvComparator; + this.cellComparator = cellComparator; this.storeFileComparator = storeFileComparator; this.comConf = comConf; this.blockingFileCount = @@ -75,39 +74,37 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override - public void loadFiles(List<StoreFile> storeFiles) { - sortAndSetStoreFiles(storeFiles); + public void loadFiles(List<HStoreFile> storeFiles) { + this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, storeFiles); } @Override - public final Collection<StoreFile> getStorefiles() { - // TODO: I can return a null list of StoreFiles? That'll mess up clients. St.Ack 20151111 + public final Collection<HStoreFile> getStorefiles() { return storefiles; } @Override - public Collection<StoreFile> getCompactedfiles() { + public Collection<HStoreFile> getCompactedfiles() { return compactedfiles; } @Override - public void insertNewFiles(Collection<StoreFile> sfs) throws IOException { - ArrayList<StoreFile> newFiles = new ArrayList<>(storefiles); - newFiles.addAll(sfs); - sortAndSetStoreFiles(newFiles); + public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException { + this.storefiles = + ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(this.storefiles, sfs)); } @Override - public ImmutableCollection<StoreFile> clearFiles() { - ImmutableList<StoreFile> result = storefiles; + public ImmutableCollection<HStoreFile> clearFiles() { + ImmutableList<HStoreFile> result = storefiles; storefiles = ImmutableList.of(); return result; } @Override - public Collection<StoreFile> clearCompactedFiles() { - List<StoreFile> result = compactedfiles; - compactedfiles = new ArrayList<>(); + public Collection<HStoreFile> clearCompactedFiles() { + List<HStoreFile> result = compactedfiles; + compactedfiles = ImmutableList.of(); return result; } @@ -118,60 +115,39 @@ class DefaultStoreFileManager implements StoreFileManager { @Override public final int getCompactedFilesCount() { - if (compactedfiles == null) { - return 0; - } return compactedfiles.size(); } @Override - public void addCompactionResults( - Collection<StoreFile> newCompactedfiles, Collection<StoreFile> results) { - ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles); - newStoreFiles.removeAll(newCompactedfiles); - if (!results.isEmpty()) { - newStoreFiles.addAll(results); - } - sortAndSetStoreFiles(newStoreFiles); - ArrayList<StoreFile> updatedCompactedfiles = null; - if (this.compactedfiles != null) { - updatedCompactedfiles = new ArrayList<>(this.compactedfiles); - updatedCompactedfiles.addAll(newCompactedfiles); - } else { - updatedCompactedfiles = new ArrayList<>(newCompactedfiles); - } - markCompactedAway(newCompactedfiles); - this.compactedfiles = sortCompactedfiles(updatedCompactedfiles); - } - - // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized - // Let a background thread close the actual reader on these compacted files and also - // ensure to evict the blocks from block cache so that they are no longer in - // cache - private void markCompactedAway(Collection<StoreFile> compactedFiles) { - for (StoreFile file : compactedFiles) { - file.markCompactedAway(); - } + public void addCompactionResults(Collection<HStoreFile> newCompactedfiles, + Collection<HStoreFile> results) { + this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, Iterables + .concat(Iterables.filter(storefiles, sf -> !newCompactedfiles.contains(sf)), results)); + // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized + // Let a background thread close the actual reader on these compacted files and also + // ensure to evict the blocks from block cache so that they are no longer in + // cache + newCompactedfiles.forEach(HStoreFile::markCompactedAway); + this.compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator, + Iterables.concat(this.compactedfiles, newCompactedfiles)); } @Override - public void removeCompactedFiles(Collection<StoreFile> removedCompactedfiles) throws IOException { - ArrayList<StoreFile> updatedCompactedfiles = null; - if (this.compactedfiles != null) { - updatedCompactedfiles = new ArrayList<>(this.compactedfiles); - updatedCompactedfiles.removeAll(removedCompactedfiles); - this.compactedfiles = sortCompactedfiles(updatedCompactedfiles); - } + public void removeCompactedFiles(Collection<HStoreFile> removedCompactedfiles) + throws IOException { + this.compactedfiles = + this.compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf)) + .sorted(storeFileComparator).collect(ImmutableList.toImmutableList()); } @Override - public final Iterator<StoreFile> getCandidateFilesForRowKeyBefore(final KeyValue targetKey) { - return new ArrayList<>(Lists.reverse(this.storefiles)).iterator(); + public final Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(KeyValue targetKey) { + return this.storefiles.reverse().iterator(); } @Override - public Iterator<StoreFile> updateCandidateFilesForRowKeyBefore( - Iterator<StoreFile> candidateFiles, final KeyValue targetKey, final Cell candidate) { + public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore( + Iterator<HStoreFile> candidateFiles, KeyValue targetKey, Cell candidate) { // Default store has nothing useful to do here. // TODO: move this comment when implementing Level: // Level store can trim the list by range, removing all the files which cannot have @@ -180,18 +156,12 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override - public final byte[] getSplitPoint() throws IOException { - List<StoreFile> storefiles = this.storefiles; - if (storefiles.isEmpty()) { - return null; - } - Optional<StoreFile> largestFile = StoreUtils.getLargestFile(storefiles); - return largestFile.isPresent() - ? StoreUtils.getFileSplitPoint(largestFile.get(), kvComparator).orElse(null) : null; + public final Optional<byte[]> getSplitPoint() throws IOException { + return StoreUtils.getSplitPoint(storefiles, cellComparator); } @Override - public final Collection<StoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow, + public final Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow) { // We cannot provide any useful input and already have the files sorted by seqNum. return getStorefiles(); @@ -204,35 +174,20 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override - public Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting) { - Collection<StoreFile> expiredStoreFiles = null; - ImmutableList<StoreFile> files = storefiles; + public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) { + ImmutableList<HStoreFile> files = storefiles; // 1) We can never get rid of the last file which has the maximum seqid. // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. - for (int i = 0; i < files.size() - 1; ++i) { - StoreFile sf = files.get(i); + return files.stream().limit(Math.max(0, files.size() - 1)).filter(sf -> { long fileTs = sf.getReader().getMaxTimestamp(); if (fileTs < maxTs && !filesCompacting.contains(sf)) { - LOG.info("Found an expired store file: " + sf.getPath() - + " whose maxTimeStamp is " + fileTs + ", which is below " + maxTs); - if (expiredStoreFiles == null) { - expiredStoreFiles = new ArrayList<>(); - } - expiredStoreFiles.add(sf); + LOG.info("Found an expired store file: " + sf.getPath() + " whose maxTimeStamp is " + + fileTs + ", which is below " + maxTs); + return true; + } else { + return false; } - } - return expiredStoreFiles; - } - - private void sortAndSetStoreFiles(List<StoreFile> storeFiles) { - Collections.sort(storeFiles, storeFileComparator); - storefiles = ImmutableList.copyOf(storeFiles); - } - - private List<StoreFile> sortCompactedfiles(List<StoreFile> storefiles) { - // Sorting may not be really needed here for the compacted files? - Collections.sort(storefiles, storeFileComparator); - return new ArrayList<>(storefiles); + }).collect(Collectors.toList()); } @Override @@ -246,7 +201,7 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override - public Comparator<StoreFile> getStoreFileComparator() { + public Comparator<HStoreFile> getStoreFileComparator() { return storeFileComparator; } }
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 14c3f92..a5dd9f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -39,7 +39,7 @@ public class DefaultStoreFlusher extends StoreFlusher { private static final Log LOG = LogFactory.getLog(DefaultStoreFlusher.class); private final Object flushLock = new Object(); - public DefaultStoreFlusher(Configuration conf, Store store) { + public DefaultStoreFlusher(Configuration conf, HStore store) { super(conf, store); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 43e4a17..95bbf74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagUtil; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; @@ -59,6 +58,7 @@ import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.IdLock; +import org.apache.yetus.audience.InterfaceAudience; /** * The store implementation to save MOBs (medium objects), it extends the HStore. @@ -166,7 +166,7 @@ public class HMobStore extends HStore { * Creates the mob store engine. */ @Override - protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf, + protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, CellComparator cellComparator) throws IOException { MobStoreEngine engine = new MobStoreEngine(); engine.createComponents(conf, store, cellComparator); @@ -291,7 +291,7 @@ public class HMobStore extends HStore { * @param path the path to the mob file */ private void validateMobFile(Path path) throws IOException { - StoreFile storeFile = null; + HStoreFile storeFile = null; try { storeFile = new HStoreFile(region.getFilesystem(), path, conf, this.mobCacheConfig, BloomType.NONE, isPrimaryReplicaStore()); @@ -301,7 +301,7 @@ public class HMobStore extends HStore { throw e; } finally { if (storeFile != null) { - storeFile.closeReader(false); + storeFile.closeStoreFile(false); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9554d7f..4fa2c70 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import java.io.EOFException; @@ -100,7 +101,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Delete; @@ -147,31 +147,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; -import org.apache.hadoop.hbase.shaded.com.google.common.io.Closeables; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; -import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.Bytes; @@ -187,6 +164,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALSplitter; @@ -195,6 +173,30 @@ import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; +import org.apache.hadoop.hbase.shaded.com.google.common.io.Closeables; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.StoreFlushDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; @SuppressWarnings("deprecation") @InterfaceAudience.Private @@ -1066,12 +1068,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private NavigableMap<byte[], List<Path>> getStoreFiles() { NavigableMap<byte[], List<Path>> allStoreFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (HStore store : stores.values()) { - Collection<StoreFile> storeFiles = store.getStorefiles(); + Collection<HStoreFile> storeFiles = store.getStorefiles(); if (storeFiles == null) { continue; } List<Path> storeFileNames = new ArrayList<>(); - for (StoreFile storeFile : storeFiles) { + for (HStoreFile storeFile : storeFiles) { storeFileNames.add(storeFile.getPath()); } allStoreFiles.put(store.getColumnFamilyDescriptor().getName(), storeFileNames); @@ -1124,7 +1126,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public HDFSBlocksDistribution getHDFSBlocksDistribution() { HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution(); stores.values().stream().filter(s -> s.getStorefiles() != null) - .flatMap(s -> s.getStorefiles().stream()).map(StoreFile::getHDFSBlockDistribution) + .flatMap(s -> s.getStorefiles().stream()).map(HStoreFile::getHDFSBlockDistribution) .forEachOrdered(hdfsBlocksDistribution::add); return hdfsBlocksDistribution; } @@ -1384,7 +1386,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.info("DEBUG LIST ALL FILES"); for (HStore store : this.stores.values()) { LOG.info("store " + store.getColumnFamilyName()); - for (StoreFile sf : store.getStorefiles()) { + for (HStoreFile sf : store.getStorefiles()) { LOG.info(sf.toStringDetailed()); } } @@ -1458,7 +1460,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * because a Snapshot was not properly persisted. The region is put in closing mode, and the * caller MUST abort after this. */ - public Map<byte[], List<StoreFile>> close() throws IOException { + public Map<byte[], List<HStoreFile>> close() throws IOException { return close(false); } @@ -1499,7 +1501,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * because a Snapshot was not properly persisted. The region is put in closing mode, and the * caller MUST abort after this. */ - public Map<byte[], List<StoreFile>> close(final boolean abort) throws IOException { + public Map<byte[], List<HStoreFile>> close(boolean abort) throws IOException { // Only allow one thread to close at a time. Serialize them so dual // threads attempting to close will run up against each other. MonitoredTask status = TaskMonitor.get().createStatus( @@ -1537,7 +1539,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK_EXCEPTION_PATH", justification="I think FindBugs is confused") - private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status) + private Map<byte[], List<HStoreFile>> doClose(boolean abort, MonitoredTask status) throws IOException { if (isClosed()) { LOG.warn("Region " + this + " already closed"); @@ -1632,13 +1634,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - Map<byte[], List<StoreFile>> result = new TreeMap<>(Bytes.BYTES_COMPARATOR); + Map<byte[], List<HStoreFile>> result = new TreeMap<>(Bytes.BYTES_COMPARATOR); if (!stores.isEmpty()) { // initialize the thread pool for closing stores in parallel. ThreadPoolExecutor storeCloserThreadPool = getStoreOpenAndCloseThreadPool("StoreCloserThread-" + getRegionInfo().getRegionNameAsString()); - CompletionService<Pair<byte[], Collection<StoreFile>>> completionService = + CompletionService<Pair<byte[], Collection<HStoreFile>>> completionService = new ExecutorCompletionService<>(storeCloserThreadPool); // close each store in parallel @@ -1654,18 +1656,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } completionService - .submit(new Callable<Pair<byte[], Collection<StoreFile>>>() { + .submit(new Callable<Pair<byte[], Collection<HStoreFile>>>() { @Override - public Pair<byte[], Collection<StoreFile>> call() throws IOException { + public Pair<byte[], Collection<HStoreFile>> call() throws IOException { return new Pair<>(store.getColumnFamilyDescriptor().getName(), store.close()); } }); } try { for (int i = 0; i < stores.size(); i++) { - Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take(); - Pair<byte[], Collection<StoreFile>> storeFiles = future.get(); - List<StoreFile> familyFiles = result.get(storeFiles.getFirst()); + Future<Pair<byte[], Collection<HStoreFile>>> future = completionService.take(); + Pair<byte[], Collection<HStoreFile>> storeFiles = future.get(); + List<HStoreFile> familyFiles = result.get(storeFiles.getFirst()); if (familyFiles == null) { familyFiles = new ArrayList<>(); result.put(storeFiles.getFirst(), familyFiles); @@ -1874,11 +1876,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public long getOldestHfileTs(boolean majorCompactionOnly) throws IOException { long result = Long.MAX_VALUE; for (HStore store : stores.values()) { - Collection<StoreFile> storeFiles = store.getStorefiles(); + Collection<HStoreFile> storeFiles = store.getStorefiles(); if (storeFiles == null) { continue; } - for (StoreFile file : storeFiles) { + for (HStoreFile file : storeFiles) { StoreFileReader sfReader = file.getReader(); if (sfReader == null) { continue; @@ -1888,7 +1890,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi continue; } if (majorCompactionOnly) { - byte[] val = reader.loadFileInfo().get(StoreFile.MAJOR_COMPACTION_KEY); + byte[] val = reader.loadFileInfo().get(MAJOR_COMPACTION_KEY); if (val == null || !Bytes.toBoolean(val)) { continue; } @@ -4182,7 +4184,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If this flag is set, make use of the hfile archiving by making recovered.edits a fake // column family. Have to fake out file type too by casting our recovered.edits as storefiles String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName(); - Set<StoreFile> fakeStoreFiles = new HashSet<>(files.size()); + Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size()); for (Path file: files) { fakeStoreFiles.add( new HStoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, null, null, true)); @@ -5296,11 +5298,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new IllegalArgumentException( "No column family : " + new String(column) + " available"); } - Collection<StoreFile> storeFiles = store.getStorefiles(); + Collection<HStoreFile> storeFiles = store.getStorefiles(); if (storeFiles == null) { continue; } - for (StoreFile storeFile : storeFiles) { + for (HStoreFile storeFile : storeFiles) { storeFileNames.add(storeFile.getPath().toString()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 3cb5bdb..3f42466 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -25,7 +25,7 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; +import java.util.Optional; import java.util.UUID; import org.apache.commons.logging.Log; @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.backup.HFileArchiver; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.fs.HFileSystem; @@ -54,6 +53,7 @@ import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; @@ -482,20 +482,6 @@ public class HRegionFileSystem { } /** - * Moves multiple store files to the relative region's family store directory. - * @param storeFiles list of store files divided by family - * @throws IOException - */ - void commitStoreFiles(final Map<byte[], List<StoreFile>> storeFiles) throws IOException { - for (Map.Entry<byte[], List<StoreFile>> es: storeFiles.entrySet()) { - String familyName = Bytes.toString(es.getKey()); - for (StoreFile sf: es.getValue()) { - commitStoreFile(familyName, sf.getPath()); - } - } - } - - /** * Archives the specified store file from the specified family. * @param familyName Family that contains the store files * @param filePath {@link Path} to the store file to remove @@ -513,7 +499,7 @@ public class HRegionFileSystem { * @param storeFiles set of store files to remove * @throws IOException if the archiving fails */ - public void removeStoreFiles(final String familyName, final Collection<StoreFile> storeFiles) + public void removeStoreFiles(String familyName, Collection<HStoreFile> storeFiles) throws IOException { HFileArchiver.archiveStoreFiles(this.conf, this.fs, this.regionInfoForFs, this.tableDir, Bytes.toBytes(familyName), storeFiles); @@ -671,9 +657,8 @@ public class HRegionFileSystem { * @return Path to created reference. * @throws IOException */ - public Path splitStoreFile(final HRegionInfo hri, final String familyName, final StoreFile f, - final byte[] splitRow, final boolean top, RegionSplitPolicy splitPolicy) - throws IOException { + public Path splitStoreFile(HRegionInfo hri, String familyName, HStoreFile f, byte[] splitRow, + boolean top, RegionSplitPolicy splitPolicy) throws IOException { if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) { // Check whether the split row lies in the range of the store file // If it is outside the range, return directly. @@ -682,28 +667,28 @@ public class HRegionFileSystem { if (top) { //check if larger than last key. Cell splitKey = CellUtil.createFirstOnRow(splitRow); - Cell lastKey = f.getLastKey(); + Optional<Cell> lastKey = f.getLastKey(); // If lastKey is null means storefile is empty. - if (lastKey == null) { + if (!lastKey.isPresent()) { return null; } - if (f.getComparator().compare(splitKey, lastKey) > 0) { + if (f.getComparator().compare(splitKey, lastKey.get()) > 0) { return null; } } else { //check if smaller than first key Cell splitKey = CellUtil.createLastOnRow(splitRow); - Cell firstKey = f.getFirstKey(); + Optional<Cell> firstKey = f.getFirstKey(); // If firstKey is null means storefile is empty. - if (firstKey == null) { + if (!firstKey.isPresent()) { return null; } - if (f.getComparator().compare(splitKey, firstKey) < 0) { + if (f.getComparator().compare(splitKey, firstKey.get()) < 0) { return null; } } } finally { - f.closeReader(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true); + f.closeStoreFile(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true); } } @@ -791,9 +776,8 @@ public class HRegionFileSystem { * @return Path to created reference. * @throws IOException */ - public Path mergeStoreFile(final HRegionInfo mergedRegion, final String familyName, - final StoreFile f, final Path mergedDir) - throws IOException { + public Path mergeStoreFile(HRegionInfo mergedRegion, String familyName, HStoreFile f, + Path mergedDir) throws IOException { Path referenceDir = new Path(new Path(mergedDir, mergedRegion.getEncodedName()), familyName); // A whole reference to the store file. http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f648c2f..394826c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; - import java.io.IOException; import java.io.InterruptedIOException; import java.lang.Thread.UncaughtExceptionHandler; @@ -84,7 +83,6 @@ import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionUtils; @@ -141,42 +139,6 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; @@ -210,10 +172,48 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; + import sun.misc.Signal; import sun.misc.SignalHandler; @@ -3172,7 +3172,7 @@ public class HRegionServer extends HasThread implements for (int i = 0; i < regionEncodedName.size(); ++i) { Region regionToClose = this.getFromOnlineRegions(regionEncodedName.get(i)); if (regionToClose != null) { - Map<byte[], List<StoreFile>> hstoreFiles = null; + Map<byte[], List<HStoreFile>> hstoreFiles = null; Exception exceptionToThrow = null; try{ hstoreFiles = ((HRegion)regionToClose).close(false);