This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 751ed7d2c077bd38caa06c643e5077c1fb371645 Merge: dcc897b739 da2b7ed883 Author: Christopher Tubbs <[email protected]> AuthorDate: Mon Mar 6 13:20:34 2023 -0500 Merge branch '1.10' into 2.1 .../main/java/org/apache/accumulo/server/fs/FileManager.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java index b859e7a39a,0000000000..79f379a0cf mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java @@@ -1,595 -1,0 +1,596 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.fs; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.SampleNotPresentException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.impl.CacheProvider; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator; +import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator; +import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator.DataSource; +import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator; +import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.problems.ProblemReport; +import org.apache.accumulo.server.problems.ProblemReportingIterator; +import org.apache.accumulo.server.problems.ProblemReports; +import org.apache.accumulo.server.problems.ProblemType; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; + +public class FileManager { + + private static final Logger log = LoggerFactory.getLogger(FileManager.class); + + private int maxOpen; + + private static class OpenReader implements Comparable<OpenReader> { + long releaseTime; + FileSKVIterator reader; + String fileName; + + public OpenReader(String fileName, FileSKVIterator reader) { + this.fileName = fileName; + this.reader = reader; + this.releaseTime = System.currentTimeMillis(); + } + + @Override + public int compareTo(OpenReader o) { + return Long.compare(releaseTime, o.releaseTime); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof OpenReader) { + return compareTo((OpenReader) obj) == 0; + } + return false; + } + + @Override + public int hashCode() { + return fileName.hashCode(); + } + } + + private Map<String,List<OpenReader>> openFiles; + private HashMap<FileSKVIterator,String> reservedReaders; + + private Semaphore filePermits; + + private Cache<String,Long> fileLenCache; + + private long maxIdleTime; + private long slowFilePermitMillis; + + private final ServerContext context; + + private class IdleFileCloser implements Runnable { + + @Override + public void run() { + + long curTime = System.currentTimeMillis(); + + ArrayList<FileSKVIterator> filesToClose = new ArrayList<>(); + + // determine which files to close in a sync block, and then close the + // files outside of the sync block + synchronized (FileManager.this) { + Iterator<Entry<String,List<OpenReader>>> iter = openFiles.entrySet().iterator(); + while (iter.hasNext()) { + Entry<String,List<OpenReader>> entry = iter.next(); + List<OpenReader> ofl = entry.getValue(); + + for (Iterator<OpenReader> oflIter = ofl.iterator(); oflIter.hasNext();) { + OpenReader openReader = oflIter.next(); + + if (curTime - openReader.releaseTime > maxIdleTime) { + + filesToClose.add(openReader.reader); + oflIter.remove(); + } + } + + if (ofl.isEmpty()) { + iter.remove(); + } + } + } + + closeReaders(filesToClose); + + } + + } + + public FileManager(ServerContext context, int maxOpen, Cache<String,Long> fileLenCache) { + + if (maxOpen <= 0) { + throw new IllegalArgumentException("maxOpen <= 0"); + } + this.context = context; + this.fileLenCache = fileLenCache; + - this.filePermits = new Semaphore(maxOpen, false); ++ // Creates a fair semaphore to ensure thread starvation doesn't occur ++ this.filePermits = new Semaphore(maxOpen, true); + this.maxOpen = maxOpen; + + this.openFiles = new HashMap<>(); + this.reservedReaders = new HashMap<>(); + + this.maxIdleTime = this.context.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE); + ThreadPools.watchCriticalScheduledTask( + this.context.getScheduledExecutor().scheduleWithFixedDelay(new IdleFileCloser(), + maxIdleTime, maxIdleTime / 2, TimeUnit.MILLISECONDS)); + + this.slowFilePermitMillis = + this.context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FILEPERMIT_MILLIS); + } + + private static int countReaders(Map<String,List<OpenReader>> files) { + int count = 0; + + for (List<OpenReader> list : files.values()) { + count += list.size(); + } + + return count; + } + + private List<FileSKVIterator> takeLRUOpenFiles(int numToTake) { + + ArrayList<OpenReader> openReaders = new ArrayList<>(); + + for (Entry<String,List<OpenReader>> entry : openFiles.entrySet()) { + openReaders.addAll(entry.getValue()); + } + + Collections.sort(openReaders); + + ArrayList<FileSKVIterator> ret = new ArrayList<>(); + + for (int i = 0; i < numToTake && i < openReaders.size(); i++) { + OpenReader or = openReaders.get(i); + + List<OpenReader> ofl = openFiles.get(or.fileName); + if (!ofl.remove(or)) { + throw new RuntimeException("Failed to remove open reader that should have been there"); + } + + if (ofl.isEmpty()) { + openFiles.remove(or.fileName); + } + + ret.add(or.reader); + } + + return ret; + } + + private void closeReaders(Collection<FileSKVIterator> filesToClose) { + for (FileSKVIterator reader : filesToClose) { + try { + reader.close(); + } catch (Exception e) { + log.error("Failed to close file {}", e.getMessage(), e); + } + } + } + + private List<String> takeOpenFiles(Collection<String> files, + Map<FileSKVIterator,String> readersReserved) { + List<String> filesToOpen = Collections.emptyList(); + for (String file : files) { + List<OpenReader> ofl = openFiles.get(file); + if (ofl != null && !ofl.isEmpty()) { + OpenReader openReader = ofl.remove(ofl.size() - 1); + readersReserved.put(openReader.reader, file); + if (ofl.isEmpty()) { + openFiles.remove(file); + } + } else { + if (filesToOpen.isEmpty()) { + filesToOpen = new ArrayList<>(files.size()); + } + filesToOpen.add(file); + } + } + return filesToOpen; + } + + private Map<FileSKVIterator,String> reserveReaders(KeyExtent tablet, Collection<String> files, + boolean continueOnFailure, CacheProvider cacheProvider) throws IOException { + + if (!tablet.isMeta() && files.size() >= maxOpen) { + throw new IllegalArgumentException("requested files exceeds max open"); + } + + if (files.isEmpty()) { + return Collections.emptyMap(); + } + + List<String> filesToOpen = null; + List<FileSKVIterator> filesToClose = Collections.emptyList(); + Map<FileSKVIterator,String> readersReserved = new HashMap<>(); + + if (!tablet.isMeta()) { + long start = System.currentTimeMillis(); + filePermits.acquireUninterruptibly(files.size()); + long waitTime = System.currentTimeMillis() - start; + + if (waitTime >= slowFilePermitMillis) { - log.info("Slow file permits request: {} ms, files requested: {}, tablet: {}", waitTime, - files.size(), tablet); ++ log.warn("Slow file permits request: {} ms, files requested: {}, " ++ + "max open files: {}, tablet: {}", waitTime, files.size(), maxOpen, tablet); + } + } + - // now that the we are past the semaphore, we have the authority ++ // now that we are past the semaphore, we have the authority + // to open files.size() files + + // determine what work needs to be done in sync block + // but do the work of opening and closing files outside - // a synch block ++ // the block + synchronized (this) { + + filesToOpen = takeOpenFiles(files, readersReserved); + + if (!filesToOpen.isEmpty()) { + int numOpen = countReaders(openFiles); + + if (filesToOpen.size() + numOpen + reservedReaders.size() > maxOpen) { + filesToClose = + takeLRUOpenFiles((filesToOpen.size() + numOpen + reservedReaders.size()) - maxOpen); + } + } + } + + readersReserved.forEach((k, v) -> k.setCacheProvider(cacheProvider)); + + // close files before opening files to ensure we stay under resource + // limitations + closeReaders(filesToClose); + + // open any files that need to be opened + for (String file : filesToOpen) { + try { + if (!file.contains(":")) { + throw new IllegalArgumentException("Expected uri, got : " + file); + } + Path path = new Path(file); + FileSystem ns = context.getVolumeManager().getFileSystemByPath(path); + // log.debug("Opening "+file + " path " + path); + var tableConf = context.getTableConfiguration(tablet.tableId()); + FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .withTableConfiguration(tableConf).withCacheProvider(cacheProvider) + .withFileLenCache(fileLenCache).build(); + readersReserved.put(reader, file); + } catch (Exception e) { + + ProblemReports.getInstance(context) + .report(new ProblemReport(tablet.tableId(), ProblemType.FILE_READ, file, e)); + + if (continueOnFailure) { + // release the permit for the file that failed to open + if (!tablet.isMeta()) { + filePermits.release(1); + } + log.warn("Failed to open file {} {} continuing...", file, e.getMessage(), e); + } else { + // close whatever files were opened + closeReaders(readersReserved.keySet()); + + if (!tablet.isMeta()) { + filePermits.release(files.size()); + } + + log.error("Failed to open file {} {}", file, e.getMessage()); + throw new IOException("Failed to open " + file, e); + } + } + } + + synchronized (this) { + // update set of reserved readers + reservedReaders.putAll(readersReserved); + } + + return readersReserved; + } + + private void releaseReaders(KeyExtent tablet, List<FileSKVIterator> readers, + boolean sawIOException) { + // put files in openFiles + + synchronized (this) { + + // check that readers were actually reserved ... want to make sure a thread does + // not try to release readers they never reserved + if (!reservedReaders.keySet().containsAll(readers)) { + throw new IllegalArgumentException("Asked to release readers that were never reserved "); + } + + for (FileSKVIterator reader : readers) { + try { + reader.closeDeepCopies(); + } catch (IOException e) { + log.warn("{}", e.getMessage(), e); + sawIOException = true; + } + } + + for (FileSKVIterator reader : readers) { + String fileName = reservedReaders.remove(reader); + if (!sawIOException) { + openFiles.computeIfAbsent(fileName, k -> new ArrayList<>()) + .add(new OpenReader(fileName, reader)); + } + } + } + + if (sawIOException) { + closeReaders(readers); + } + + // decrement the semaphore + if (!tablet.isMeta()) { + filePermits.release(readers.size()); + } + + } + + static class FileDataSource implements DataSource { + + private SortedKeyValueIterator<Key,Value> iter; + private ArrayList<FileDataSource> deepCopies; + private boolean current = true; + private IteratorEnvironment env; + private String file; + private AtomicBoolean iflag; + + FileDataSource(String file, SortedKeyValueIterator<Key,Value> iter) { + this.file = file; + this.iter = iter; + this.deepCopies = new ArrayList<>(); + } + + public FileDataSource(IteratorEnvironment env, SortedKeyValueIterator<Key,Value> deepCopy, + ArrayList<FileDataSource> deepCopies) { + this.iter = deepCopy; + this.env = env; + this.deepCopies = deepCopies; + deepCopies.add(this); + } + + @Override + public boolean isCurrent() { + return current; + } + + @Override + public DataSource getNewDataSource() { + current = true; + return this; + } + + @Override + public DataSource getDeepCopyDataSource(IteratorEnvironment env) { + return new FileDataSource(env, iter.deepCopy(env), deepCopies); + } + + @Override + public SortedKeyValueIterator<Key,Value> iterator() { + return iter; + } + + void unsetIterator() { + current = false; + iter = null; + for (FileDataSource fds : deepCopies) { + fds.current = false; + fds.iter = null; + } + } + + void setIterator(SortedKeyValueIterator<Key,Value> iter) { + current = false; + this.iter = iter; + + if (iflag != null) { + ((InterruptibleIterator) this.iter).setInterruptFlag(iflag); + } + + for (FileDataSource fds : deepCopies) { + fds.current = false; + fds.iter = iter.deepCopy(fds.env); + } + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + this.iflag = flag; + ((InterruptibleIterator) this.iter).setInterruptFlag(iflag); + } + } + + public class ScanFileManager { + + private ArrayList<FileDataSource> dataSources; + private ArrayList<FileSKVIterator> tabletReservedReaders; + private KeyExtent tablet; + private boolean continueOnFailure; + private CacheProvider cacheProvider; + + ScanFileManager(KeyExtent tablet, CacheProvider cacheProvider) { + tabletReservedReaders = new ArrayList<>(); + dataSources = new ArrayList<>(); + this.tablet = tablet; + this.cacheProvider = cacheProvider; + + continueOnFailure = context.getTableConfiguration(tablet.tableId()) + .getBoolean(Property.TABLE_FAILURES_IGNORE); + + if (tablet.isMeta()) { + continueOnFailure = false; + } + } + + private Map<FileSKVIterator,String> openFiles(List<String> files) + throws TooManyFilesException, IOException { + // one tablet can not open more than maxOpen files, otherwise it could get stuck + // forever waiting on itself to release files + + if (tabletReservedReaders.size() + files.size() >= maxOpen) { + throw new TooManyFilesException( + "Request to open files would exceed max open files reservedReaders.size()=" + + tabletReservedReaders.size() + " files.size()=" + files.size() + " maxOpen=" + + maxOpen + " tablet = " + tablet); + } + + Map<FileSKVIterator,String> newlyReservedReaders = + reserveReaders(tablet, files, continueOnFailure, cacheProvider); + + tabletReservedReaders.addAll(newlyReservedReaders.keySet()); + return newlyReservedReaders; + } + + public synchronized List<InterruptibleIterator> openFiles(Map<TabletFile,DataFileValue> files, + boolean detachable, SamplerConfigurationImpl samplerConfig) throws IOException { + + Map<FileSKVIterator,String> newlyReservedReaders = openFiles( + files.keySet().stream().map(TabletFile::getPathStr).collect(Collectors.toList())); + + ArrayList<InterruptibleIterator> iters = new ArrayList<>(); + + boolean sawTimeSet = files.values().stream().anyMatch(DataFileValue::isTimeSet); + + for (Entry<FileSKVIterator,String> entry : newlyReservedReaders.entrySet()) { + FileSKVIterator source = entry.getKey(); + String filename = entry.getValue(); + InterruptibleIterator iter; + + if (samplerConfig != null) { + source = source.getSample(samplerConfig); + if (source == null) { + throw new SampleNotPresentException(); + } + } + + iter = new ProblemReportingIterator(context, tablet.tableId(), filename, continueOnFailure, + detachable ? getSsi(filename, source) : source); + + if (sawTimeSet) { + // constructing FileRef is expensive so avoid if not needed + DataFileValue value = files.get(new TabletFile(new Path(filename))); + if (value.isTimeSet()) { + iter = new TimeSettingIterator(iter, value.getTime()); + } + } + + iters.add(iter); + } + + return iters; + } + + private SourceSwitchingIterator getSsi(String filename, FileSKVIterator source) { + FileDataSource fds = new FileDataSource(filename, source); + dataSources.add(fds); + return new SourceSwitchingIterator(fds); + } + + public synchronized void detach() { + + releaseReaders(tablet, tabletReservedReaders, false); + tabletReservedReaders.clear(); + + for (FileDataSource fds : dataSources) { + fds.unsetIterator(); + } + } + + public synchronized void reattach(SamplerConfigurationImpl samplerConfig) throws IOException { + if (!tabletReservedReaders.isEmpty()) { + throw new IllegalStateException(); + } + + List<String> files = dataSources.stream().map(x -> x.file).collect(Collectors.toList()); + Map<FileSKVIterator,String> newlyReservedReaders = openFiles(files); + Map<String,List<FileSKVIterator>> map = new HashMap<>(); + newlyReservedReaders.forEach( + (reader, fileName) -> map.computeIfAbsent(fileName, k -> new LinkedList<>()).add(reader)); + + for (FileDataSource fds : dataSources) { + FileSKVIterator source = map.get(fds.file).remove(0); + if (samplerConfig != null) { + source = source.getSample(samplerConfig); + if (source == null) { + throw new SampleNotPresentException(); + } + } + fds.setIterator(source); + } + } + + public synchronized void releaseOpenFiles(boolean sawIOException) { + releaseReaders(tablet, tabletReservedReaders, sawIOException); + tabletReservedReaders.clear(); + dataSources.clear(); + } + + public synchronized int getNumOpenFiles() { + return tabletReservedReaders.size(); + } + } + + public ScanFileManager newScanFileManager(KeyExtent tablet, CacheProvider cacheProvider) { + return new ScanFileManager(tablet, cacheProvider); + } +}
