http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/env.h b/be/src/kudu/util/env.h new file mode 100644 index 0000000..7f06c4e --- /dev/null +++ b/be/src/kudu/util/env.h @@ -0,0 +1,643 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// An Env is an interface used by the kudu implementation to access +// operating system functionality like the filesystem etc. Callers +// may wish to provide a custom Env object when opening a database to +// get fine gain control; e.g., to rate limit file system operations. +// +// All Env implementations are safe for concurrent access from +// multiple threads without any external synchronization. + +#ifndef STORAGE_LEVELDB_INCLUDE_ENV_H_ +#define STORAGE_LEVELDB_INCLUDE_ENV_H_ + +#include <cstdarg> +#include <cstdint> +#include <map> +#include <memory> +#include <string> +#include <vector> + +#include "kudu/gutil/callback_forward.h" +#include "kudu/util/status.h" + +namespace kudu { + +class faststring; +class FileLock; +class RandomAccessFile; +class RWFile; +class SequentialFile; +class Slice; +class WritableFile; + +struct RandomAccessFileOptions; +struct RWFileOptions; +struct WritableFileOptions; + +// Returned by Env::GetSpaceInfo(). +struct SpaceInfo { + int64_t capacity_bytes; // Capacity of a filesystem, in bytes. + int64_t free_bytes; // Bytes available to non-privileged processes. +}; + +class Env { + public: + // Governs if/how the file is created. + // + // enum value | file exists | file does not exist + // --------------------------------+-------------------+-------------------- + // CREATE_IF_NON_EXISTING_TRUNCATE | opens + truncates | creates + // CREATE_NON_EXISTING | fails | creates + // OPEN_EXISTING | opens | fails + enum CreateMode { + CREATE_IF_NON_EXISTING_TRUNCATE, + CREATE_NON_EXISTING, + OPEN_EXISTING + }; + + Env() { } + virtual ~Env(); + + // Return a default environment suitable for the current operating + // system. Sophisticated users may wish to provide their own Env + // implementation instead of relying on this default environment. + // + // The result of Default() belongs to kudu and must never be deleted. + static Env* Default(); + + // Create a brand new sequentially-readable file with the specified name. + // On success, stores a pointer to the new file in *result and returns OK. + // On failure stores NULL in *result and returns non-OK. If the file does + // not exist, returns a non-OK status. + // + // The returned file will only be accessed by one thread at a time. + virtual Status NewSequentialFile(const std::string& fname, + std::unique_ptr<SequentialFile>* result) = 0; + + // Create a brand new random access read-only file with the + // specified name. On success, stores a pointer to the new file in + // *result and returns OK. On failure stores NULL in *result and + // returns non-OK. If the file does not exist, returns a non-OK + // status. + // + // The returned file may be concurrently accessed by multiple threads. + virtual Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr<RandomAccessFile>* result) = 0; + + // Like the previous NewRandomAccessFile, but allows options to be specified. + virtual Status NewRandomAccessFile(const RandomAccessFileOptions& opts, + const std::string& fname, + std::unique_ptr<RandomAccessFile>* result) = 0; + + // Create an object that writes to a new file with the specified + // name. Deletes any existing file with the same name and creates a + // new file. On success, stores a pointer to the new file in + // *result and returns OK. On failure stores NULL in *result and + // returns non-OK. + // + // The returned file will only be accessed by one thread at a time. + virtual Status NewWritableFile(const std::string& fname, + std::unique_ptr<WritableFile>* result) = 0; + + + // Like the previous NewWritableFile, but allows options to be + // specified. + virtual Status NewWritableFile(const WritableFileOptions& opts, + const std::string& fname, + std::unique_ptr<WritableFile>* result) = 0; + + // Creates a new WritableFile provided the name_template parameter. + // The last six characters of name_template must be "XXXXXX" and these are + // replaced with a string that makes the filename unique. + // The resulting created filename, if successful, will be stored in the + // created_filename out parameter. + // The file is created with permissions 0600, that is, read plus write for + // owner only. The implementation will create the file in a secure manner, + // and will return an error Status if it is unable to open the file. + virtual Status NewTempWritableFile(const WritableFileOptions& opts, + const std::string& name_template, + std::string* created_filename, + std::unique_ptr<WritableFile>* result) = 0; + + // Creates a new readable and writable file. If a file with the same name + // already exists on disk, it is deleted. + // + // Some of the methods of the new file may be accessed concurrently, + // while others are only safe for access by one thread at a time. + virtual Status NewRWFile(const std::string& fname, + std::unique_ptr<RWFile>* result) = 0; + + // Like the previous NewRWFile, but allows options to be specified. + virtual Status NewRWFile(const RWFileOptions& opts, + const std::string& fname, + std::unique_ptr<RWFile>* result) = 0; + + // Same as abovoe for NewTempWritableFile(), but for an RWFile. + virtual Status NewTempRWFile(const RWFileOptions& opts, + const std::string& name_template, + std::string* created_filename, + std::unique_ptr<RWFile>* res) = 0; + + // Returns true iff the named file exists. + virtual bool FileExists(const std::string& fname) = 0; + + // Store in *result the names of the children of the specified directory. + // The names are relative to "dir". + // Original contents of *results are dropped. + virtual Status GetChildren(const std::string& dir, + std::vector<std::string>* result) = 0; + + // Delete the named file. + virtual Status DeleteFile(const std::string& fname) = 0; + + // Create the specified directory. + virtual Status CreateDir(const std::string& dirname) = 0; + + // Delete the specified directory. + virtual Status DeleteDir(const std::string& dirname) = 0; + + // Return the current working directory. + virtual Status GetCurrentWorkingDir(std::string* cwd) const = 0; + + // Change the current working directory. + virtual Status ChangeDir(const std::string& dest) = 0; + + // Synchronize the entry for a specific directory. + virtual Status SyncDir(const std::string& dirname) = 0; + + // Recursively delete the specified directory. + // This should operate safely, not following any symlinks, etc. + virtual Status DeleteRecursively(const std::string &dirname) = 0; + + // Store the logical size of fname in *file_size. + virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0; + + // Store the physical size of fname in *file_size. + // + // This differs from GetFileSize() in that it returns the actual amount + // of space consumed by the file, not the user-facing file size. + virtual Status GetFileSizeOnDisk(const std::string& fname, uint64_t* file_size) = 0; + + // Walk 'root' recursively, looking up the amount of space used by each file + // as reported by GetFileSizeOnDisk(), storing the grand total in 'bytes_used'. + virtual Status GetFileSizeOnDiskRecursively(const std::string& root, uint64_t* bytes_used) = 0; + + // Returns the modified time of the file in microseconds. + // + // The timestamp is a 'system' timestamp, and is not guaranteed to be + // monotonic, or have any other consistency properties. The granularity of the + // timestamp is not guaranteed, and may be as high as 1 second on some + // platforms. The timestamp is not guaranteed to be anchored to any particular + // epoch. + virtual Status GetFileModifiedTime(const std::string& fname, int64_t* timestamp) = 0; + + // Store the block size of the filesystem where fname resides in + // *block_size. fname must exist but it may be a file or a directory. + virtual Status GetBlockSize(const std::string& fname, uint64_t* block_size) = 0; + + // Determine the capacity and number of bytes free on the filesystem + // specified by 'path'. "Free space" accounting on the underlying filesystem + // may be more coarse than single bytes. + virtual Status GetSpaceInfo(const std::string& path, SpaceInfo* space_info) = 0; + + // Rename file src to target. + virtual Status RenameFile(const std::string& src, + const std::string& target) = 0; + + // Lock the specified file. Used to prevent concurrent access to + // the same db by multiple processes. On failure, stores NULL in + // *lock and returns non-OK. + // + // On success, stores a pointer to the object that represents the + // acquired lock in *lock and returns OK. The caller should call + // UnlockFile(*lock) to release the lock. If the process exits, + // the lock will be automatically released. + // + // If somebody else already holds the lock, finishes immediately + // with a failure. I.e., this call does not wait for existing locks + // to go away. + // + // May create the named file if it does not already exist. + virtual Status LockFile(const std::string& fname, FileLock** lock) = 0; + + // Release the lock acquired by a previous successful call to LockFile. + // REQUIRES: lock was returned by a successful LockFile() call + // REQUIRES: lock has not already been unlocked. + virtual Status UnlockFile(FileLock* lock) = 0; + + // *path is set to a temporary directory that can be used for testing. It may + // or many not have just been created. The directory may or may not differ + // between runs of the same process, but subsequent calls will return the + // same directory. + virtual Status GetTestDirectory(std::string* path) = 0; + + // Returns the number of micro-seconds since some fixed point in time. Only + // useful for computing deltas of time. + virtual uint64_t NowMicros() = 0; + + // Sleep/delay the thread for the perscribed number of micro-seconds. + virtual void SleepForMicroseconds(int micros) = 0; + + // Get caller's thread id. + virtual uint64_t gettid() = 0; + + // Return the full path of the currently running executable. + virtual Status GetExecutablePath(std::string* path) = 0; + + // Checks if the file is a directory. Returns an error if it doesn't + // exist, otherwise writes true or false into 'is_dir' appropriately. + virtual Status IsDirectory(const std::string& path, bool* is_dir) = 0; + + // The kind of file found during a walk. Note that symbolic links are + // reported as FILE_TYPE. + enum FileType { + DIRECTORY_TYPE, + FILE_TYPE, + }; + + // Called for each file/directory in the walk. + // + // The first argument is the type of file. + // The second is the dirname of the file. + // The third is the basename of the file. + // + // Returning an error won't halt the walk, but it will cause it to return + // with an error status when it's done. + typedef Callback<Status(FileType, const std::string&, const std::string&)> WalkCallback; + + // Whether to walk directories in pre-order or post-order. + enum DirectoryOrder { + PRE_ORDER, + POST_ORDER, + }; + + // Walk the filesystem subtree from 'root' down, invoking 'cb' for each + // file or directory found, including 'root'. + // + // The walk will not cross filesystem boundaries. It won't change the + // working directory, nor will it follow symbolic links. + virtual Status Walk(const std::string& root, + DirectoryOrder order, + const WalkCallback& cb) = 0; + + // Finds paths on the filesystem matching a pattern. + // + // The found pathnames are added to the 'paths' vector. If no pathnames are + // found matching the pattern, no paths are added to the vector and an OK + // status is returned. + virtual Status Glob(const std::string& path_pattern, std::vector<std::string>* paths) = 0; + + // Canonicalize 'path' by applying the following conversions: + // - Converts a relative path into an absolute one using the cwd. + // - Converts '.' and '..' references. + // - Resolves all symbolic links. + // + // All directory entries in 'path' must exist on the filesystem. + virtual Status Canonicalize(const std::string& path, std::string* result) = 0; + + // Get the total amount of RAM installed on this machine. + virtual Status GetTotalRAMBytes(int64_t* ram) = 0; + + // Get the max number of file descriptors that this process can open. + virtual int64_t GetOpenFileLimit() = 0; + + // Increase the max number of file descriptors that this process can open as + // much as possible. On UNIX platforms, this means increasing the + // RLIMIT_NOFILE resource soft limit (the limit actually enforced by the + // kernel) to be equal to the hard limit. + virtual void IncreaseOpenFileLimit() = 0; + + // Checks whether the given path resides on an ext2, ext3, or ext4 + // filesystem. + // + // On success, 'result' contains the answer. On failure, 'result' is unset. + virtual Status IsOnExtFilesystem(const std::string& path, bool* result) = 0; + + // Gets the kernel release string for this machine. + virtual std::string GetKernelRelease() = 0; + + // Ensure that the file with the given path has permissions which adhere + // to the current configured umask (from flags.h). If the permissions are + // wider than the current umask, then a warning is logged and the permissions + // are fixed. + // + // Returns a bad Status if the file does not exist or the permissions cannot + // be changed. + virtual Status EnsureFileModeAdheresToUmask(const std::string& path) = 0; + + // Special string injected into file-growing operations' random failures + // (if enabled). + // + // Only useful for tests. + static const char* const kInjectedFailureStatusMsg; + + private: + // No copying allowed + Env(const Env&); + void operator=(const Env&); +}; + +// A file abstraction for reading sequentially through a file +class SequentialFile { + public: + SequentialFile() { } + virtual ~SequentialFile(); + + // Read up to "result.size" bytes from the file. + // Sets "result.data" to the data that was read. + // If an error was encountered, returns a non-OK status + // and the contents of "result" are invalid. + // + // REQUIRES: External synchronization + virtual Status Read(Slice* result) = 0; + + // Skip "n" bytes from the file. This is guaranteed to be no + // slower that reading the same data, but may be faster. + // + // If end of file is reached, skipping will stop at the end of the + // file, and Skip will return OK. + // + // REQUIRES: External synchronization + virtual Status Skip(uint64_t n) = 0; + + // Returns the filename provided when the SequentialFile was constructed. + virtual const std::string& filename() const = 0; +}; + +// A file abstraction for randomly reading the contents of a file. +class RandomAccessFile { + public: + RandomAccessFile() { } + virtual ~RandomAccessFile(); + + // Read up to "result.size" from the file starting at "offset". + // Sets "result.data" to the data that was read. + // If an error was encountered, returns a non-OK status. + // + // This method will internally retry on EINTR and "short reads" in order to + // fully read the requested number of bytes. In the event that it is not + // possible to read exactly 'length' bytes, an IOError is returned. + // + // Safe for concurrent use by multiple threads. + virtual Status Read(uint64_t offset, Slice* result) const = 0; + + // Reads up to the "results" aggregate size, based on each Slice's "size", + // from the file starting at 'offset'. + // Sets each "result.data" to the data that was read. + // If an error was encountered, returns a non-OK status. + // + // This method will internally retry on EINTR and "short reads" in order to + // fully read the requested number of bytes. In the event that it is not + // possible to read exactly 'length' bytes, an IOError is returned. + // + // Safe for concurrent use by multiple threads. + virtual Status ReadV(uint64_t offset, std::vector<Slice>* results) const = 0; + + // Returns the size of the file + virtual Status Size(uint64_t *size) const = 0; + + // Returns the filename provided when the RandomAccessFile was constructed. + virtual const std::string& filename() const = 0; + + // Returns the approximate memory usage of this RandomAccessFile including + // the object itself. + virtual size_t memory_footprint() const = 0; +}; + +// Creation-time options for WritableFile +struct WritableFileOptions { + // Call Sync() during Close(). + bool sync_on_close; + + // See CreateMode for details. + Env::CreateMode mode; + + WritableFileOptions() + : sync_on_close(false), + mode(Env::CREATE_IF_NON_EXISTING_TRUNCATE) { } +}; + +// Options specified when a file is opened for random access. +struct RandomAccessFileOptions { + RandomAccessFileOptions() {} +}; + +// A file abstraction for sequential writing. The implementation +// must provide buffering since callers may append small fragments +// at a time to the file. +class WritableFile { + public: + enum FlushMode { + FLUSH_SYNC, + FLUSH_ASYNC + }; + + WritableFile() { } + virtual ~WritableFile(); + + virtual Status Append(const Slice& data) = 0; + + // If possible, uses scatter-gather I/O to efficiently append + // multiple buffers to a file. Otherwise, falls back to regular I/O. + // + // For implementation specific quirks and details, see comments in + // implementation source code (e.g., env_posix.cc) + virtual Status AppendV(const std::vector<Slice>& data) = 0; + + // Pre-allocates 'size' bytes for the file in the underlying filesystem. + // size bytes are added to the current pre-allocated size or to the current + // offset, whichever is bigger. In no case is the file truncated by this + // operation. + // + // On some implementations, preallocation is done without initializing the + // contents of the data blocks (as opposed to writing zeroes), requiring no + // IO to the data blocks. + // + // In no case is the file truncated by this operation. + virtual Status PreAllocate(uint64_t size) = 0; + + virtual Status Close() = 0; + + // Flush all dirty data (not metadata) to disk. + // + // If the flush mode is synchronous, will wait for flush to finish and + // return a meaningful status. + virtual Status Flush(FlushMode mode) = 0; + + virtual Status Sync() = 0; + + virtual uint64_t Size() const = 0; + + // Returns the filename provided when the WritableFile was constructed. + virtual const std::string& filename() const = 0; + + private: + // No copying allowed + WritableFile(const WritableFile&); + void operator=(const WritableFile&); +}; + +// Creation-time options for RWFile +struct RWFileOptions { + // Call Sync() during Close(). + bool sync_on_close; + + // See CreateMode for details. + Env::CreateMode mode; + + RWFileOptions() + : sync_on_close(false), + mode(Env::CREATE_IF_NON_EXISTING_TRUNCATE) { } +}; + +// A file abstraction for both reading and writing. No notion of a built-in +// file offset is ever used; instead, all operations must provide an +// explicit offset. +// +// All operations are safe for concurrent use by multiple threads (unless +// noted otherwise) bearing in mind the usual filesystem coherency guarantees +// (e.g. two threads that write concurrently to the same file offset will +// probably yield garbage). +class RWFile { + public: + enum FlushMode { + FLUSH_SYNC, + FLUSH_ASYNC + }; + + RWFile() { + } + + virtual ~RWFile(); + + // Read up to "result.size" from the file starting at "offset". + // Sets "result.data" to the data that was read. + // If an error was encountered, returns a non-OK status. + // + // This method will internally retry on EINTR and "short reads" in order to + // fully read the requested number of bytes. In the event that it is not + // possible to read exactly 'length' bytes, an IOError is returned. + // + // Safe for concurrent use by multiple threads. + virtual Status Read(uint64_t offset, Slice* result) const = 0; + + // Reads up to the "results" aggregate size, based on each Slice's "size", + // from the file starting at 'offset'. + // Sets each "result.data" to the data that was read. + // If an error was encountered, returns a non-OK status. + // + // This method will internally retry on EINTR and "short reads" in order to + // fully read the requested number of bytes. In the event that it is not + // possible to read exactly 'length' bytes, an IOError is returned. + // + // Safe for concurrent use by multiple threads. + virtual Status ReadV(uint64_t offset, std::vector<Slice>* results) const = 0; + + // Writes 'data' to the file position given by 'offset'. + virtual Status Write(uint64_t offset, const Slice& data) = 0; + + // Writes the 'data' vector to the file position given by 'offset'. + virtual Status WriteV(uint64_t offset, const std::vector<Slice>& data) = 0; + + // Preallocates 'length' bytes for the file in the underlying filesystem + // beginning at 'offset'. It is safe to preallocate the same range + // repeatedly; this is an idempotent operation. + // + // On some implementations, preallocation is done without initializing the + // contents of the data blocks (as opposed to writing zeroes), requiring no + // IO to the data blocks. On such implementations, this is much faster than + // using Truncate() to increase the file size. + // + // In no case is the file truncated by this operation. + // + // 'mode' controls whether the file's logical size grows to include the + // preallocated space, or whether it remains the same. + enum PreAllocateMode { + CHANGE_FILE_SIZE, + DONT_CHANGE_FILE_SIZE + }; + virtual Status PreAllocate(uint64_t offset, + size_t length, + PreAllocateMode mode) = 0; + + // Truncate the file. If 'new_size' is less than the previous file size, the + // extra data will be lost. If 'new_size' is greater than the previous file + // size, the file length is extended, and the extended portion will contain + // null bytes ('\0'). + virtual Status Truncate(uint64_t length) = 0; + + // Deallocates space given by 'offset' and length' from the file, + // effectively "punching a hole" in it. The space will be reclaimed by + // the filesystem and reads to that range will return zeroes. Useful + // for making whole files sparse. + // + // Filesystems that don't implement this will return an error. + virtual Status PunchHole(uint64_t offset, size_t length) = 0; + + // Flushes the range of dirty data (not metadata) given by 'offset' and + // 'length' to disk. If length is 0, all bytes from 'offset' to the end + // of the file are flushed. + // + // If the flush mode is synchronous, will wait for flush to finish and + // return a meaningful status. + virtual Status Flush(FlushMode mode, uint64_t offset, size_t length) = 0; + + // Synchronously flushes all dirty file data and metadata to disk. Upon + // returning successfully, all previously issued file changes have been + // made durable. + virtual Status Sync() = 0; + + // Closes the file, optionally calling Sync() on it if the file was + // created with the sync_on_close option enabled. + // + // Not thread-safe. + virtual Status Close() = 0; + + // Retrieves the file's size. + virtual Status Size(uint64_t* size) const = 0; + + // Retrieve a map of the file's live extents. + // + // Each map entry is an offset and size representing a section of live file + // data. Any byte offset not contained in a map entry implicitly belongs to a + // "hole" in the (sparse) file. + // + // If the underlying filesystem does not support extents, map entries + // represent runs of adjacent fixed-size filesystem blocks instead. If the + // platform doesn't support fetching extents at all, a NotSupported status + // will be returned. + typedef std::map<uint64_t, uint64_t> ExtentMap; + virtual Status GetExtentMap(ExtentMap* out) const = 0; + + // Returns the filename provided when the RWFile was constructed. + virtual const std::string& filename() const = 0; + + private: + DISALLOW_COPY_AND_ASSIGN(RWFile); +}; + +// Identifies a locked file. +class FileLock { + public: + FileLock() { } + virtual ~FileLock(); + private: + // No copying allowed + FileLock(const FileLock&); + void operator=(const FileLock&); +}; + +// A utility routine: write "data" to the named file. +extern Status WriteStringToFile(Env* env, const Slice& data, + const std::string& fname); + +// A utility routine: read contents of named file into *data +extern Status ReadFileToString(Env* env, const std::string& fname, + faststring* data); + +} // namespace kudu + +#endif // STORAGE_LEVELDB_INCLUDE_ENV_H_
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env_posix.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/env_posix.cc b/be/src/kudu/util/env_posix.cc new file mode 100644 index 0000000..a3998fc --- /dev/null +++ b/be/src/kudu/util/env_posix.cc @@ -0,0 +1,1608 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include <dirent.h> +#include <errno.h> +#include <fcntl.h> +#include <fts.h> +#include <glob.h> +#include <limits.h> +#include <pthread.h> +#include <sys/mman.h> +#include <sys/resource.h> +#include <sys/stat.h> +#include <sys/statvfs.h> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <sys/utsname.h> +#include <unistd.h> + +#include <cstdio> +#include <cstdlib> +#include <cstring> +#include <ctime> +#include <memory> +#include <numeric> +#include <string> +#include <type_traits> +#include <vector> + +#include <glog/logging.h> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/bind.h" +#include "kudu/gutil/callback.h" +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/atomic.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/env.h" +#include "kudu/util/errno.h" +#include "kudu/util/flags.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/fault_injection.h" +#include "kudu/util/logging.h" +#include "kudu/util/malloc.h" +#include "kudu/util/monotime.h" +#include "kudu/util/path_util.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/slice.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/thread_restrictions.h" +#include "kudu/util/trace.h" + +#if defined(__APPLE__) +#include <mach-o/dyld.h> +#include <sys/sysctl.h> +#else +#include <linux/falloc.h> +#include <linux/fiemap.h> +#include <linux/fs.h> +#include <linux/magic.h> +#include <sys/ioctl.h> +#include <sys/sysinfo.h> +#include <sys/vfs.h> +#endif // defined(__APPLE__) + +// Copied from falloc.h. Useful for older kernels that lack support for +// hole punching; fallocate(2) will return EOPNOTSUPP. +#ifndef FALLOC_FL_KEEP_SIZE +#define FALLOC_FL_KEEP_SIZE 0x01 /* default is extend size */ +#endif +#ifndef FALLOC_FL_PUNCH_HOLE +#define FALLOC_FL_PUNCH_HOLE 0x02 /* de-allocates range */ +#endif + +// For platforms without fdatasync (like OS X) +#ifndef fdatasync +#define fdatasync fsync +#endif + +// For platforms without unlocked_stdio (like OS X) +#ifndef fread_unlocked +#define fread_unlocked fread +#endif + +// Retry on EINTR for functions like read() that return -1 on error. +#define RETRY_ON_EINTR(err, expr) do { \ + static_assert(std::is_signed<decltype(err)>::value == true, \ + #err " must be a signed integer"); \ + (err) = (expr); \ +} while ((err) == -1 && errno == EINTR) + +// Same as the above, but for stream API calls like fread() and fwrite(). +#define STREAM_RETRY_ON_EINTR(nread, stream, expr) do { \ + static_assert(std::is_unsigned<decltype(nread)>::value == true, \ + #nread " must be an unsigned integer"); \ + (nread) = (expr); \ +} while ((nread) == 0 && ferror(stream) == EINTR) + +// See KUDU-588 for details. +DEFINE_bool(env_use_fsync, false, + "Use fsync(2) instead of fdatasync(2) for synchronizing dirty " + "data to disk."); +TAG_FLAG(env_use_fsync, advanced); +TAG_FLAG(env_use_fsync, evolving); + +DEFINE_bool(suicide_on_eio, true, + "Kill the process if an I/O operation results in EIO"); +TAG_FLAG(suicide_on_eio, advanced); + +DEFINE_bool(never_fsync, false, + "Never fsync() anything to disk. This is used by certain test cases to " + "speed up runtime. This is very unsafe to use in production."); +TAG_FLAG(never_fsync, advanced); +TAG_FLAG(never_fsync, unsafe); + +DEFINE_double(env_inject_io_error, 0.0, + "Fraction of the time that certain I/O operations will fail"); +TAG_FLAG(env_inject_io_error, hidden); + +DEFINE_int32(env_inject_short_read_bytes, 0, + "The number of bytes less than the requested bytes to read"); +TAG_FLAG(env_inject_short_read_bytes, hidden); +DEFINE_int32(env_inject_short_write_bytes, 0, + "The number of bytes less than the requested bytes to write"); +TAG_FLAG(env_inject_short_write_bytes, hidden); + +using base::subtle::Atomic64; +using base::subtle::Barrier_AtomicIncrement; +using std::accumulate; +using std::string; +using std::unique_ptr; +using std::vector; +using strings::Substitute; + +static __thread uint64_t thread_local_id; +static Atomic64 cur_thread_local_id_; + +namespace kudu { + +const char* const Env::kInjectedFailureStatusMsg = "INJECTED FAILURE"; + +namespace { + +#if defined(__APPLE__) +// Simulates Linux's fallocate file preallocation API on OS X. +int fallocate(int fd, int mode, off_t offset, off_t len) { + CHECK_EQ(mode, 0); + off_t size = offset + len; + + struct stat stat; + int ret = fstat(fd, &stat); + if (ret < 0) { + return ret; + } + + if (stat.st_blocks * 512 < size) { + // The offset field seems to have no effect; the file is always allocated + // with space from 0 to the size. This is probably because OS X does not + // support sparse files. + fstore_t store = {F_ALLOCATECONTIG, F_PEOFPOSMODE, 0, size}; + if (fcntl(fd, F_PREALLOCATE, &store) < 0) { + LOG(INFO) << "Unable to allocate contiguous disk space, attempting non-contiguous allocation"; + store.fst_flags = F_ALLOCATEALL; + ret = fcntl(fd, F_PREALLOCATE, &store); + if (ret < 0) { + return ret; + } + } + } + + if (stat.st_size < size) { + // fcntl does not change the file size, so set it if necessary. + int ret; + RETRY_ON_EINTR(ret, ftruncate(fd, size)); + return ret; + } + return 0; +} + +// Simulates Linux's preadv API on OS X. +ssize_t preadv(int fd, const struct iovec* iovec, int count, off_t offset) { + ssize_t total_read_bytes = 0; + for (int i = 0; i < count; i++) { + ssize_t r; + RETRY_ON_EINTR(r, pread(fd, iovec[i].iov_base, iovec[i].iov_len, offset)); + if (r < 0) { + return r; + } + total_read_bytes += r; + if (static_cast<size_t>(r) < iovec[i].iov_len) { + break; + } + offset += iovec[i].iov_len; + } + return total_read_bytes; +} + +// Simulates Linux's pwritev API on OS X. +ssize_t pwritev(int fd, const struct iovec* iovec, int count, off_t offset) { + ssize_t total_written_bytes = 0; + for (int i = 0; i < count; i++) { + ssize_t r; + RETRY_ON_EINTR(r, pwrite(fd, iovec[i].iov_base, iovec[i].iov_len, offset)); + if (r < 0) { + return r; + } + total_written_bytes += r; + if (static_cast<size_t>(r) < iovec[i].iov_len) { + break; + } + offset += iovec[i].iov_len; + } + return total_written_bytes; +} +#endif + + +// Close file descriptor when object goes out of scope. +class ScopedFdCloser { + public: + explicit ScopedFdCloser(int fd) + : fd_(fd) { + } + + ~ScopedFdCloser() { + ThreadRestrictions::AssertIOAllowed(); + int err = ::close(fd_); + if (PREDICT_FALSE(err != 0)) { + PLOG(WARNING) << "Failed to close fd " << fd_; + } + } + + private: + int fd_; +}; + +Status IOError(const std::string& context, int err_number) { + switch (err_number) { + case ENOENT: + return Status::NotFound(context, ErrnoToString(err_number), err_number); + case EEXIST: + return Status::AlreadyPresent(context, ErrnoToString(err_number), err_number); + case EOPNOTSUPP: + return Status::NotSupported(context, ErrnoToString(err_number), err_number); + case EIO: + if (FLAGS_suicide_on_eio) { + // TODO: This is very, very coarse-grained. A more comprehensive + // approach is described in KUDU-616. + LOG(FATAL) << "Fatal I/O error, context: " << context; + } + } + return Status::IOError(context, ErrnoToString(err_number), err_number); +} + +Status DoSync(int fd, const string& filename) { + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + ThreadRestrictions::AssertIOAllowed(); + if (FLAGS_never_fsync) return Status::OK(); + if (FLAGS_env_use_fsync) { + TRACE_COUNTER_SCOPE_LATENCY_US("fsync_us"); + TRACE_COUNTER_INCREMENT("fsync", 1); + if (fsync(fd) < 0) { + return IOError(filename, errno); + } + } else { + TRACE_COUNTER_INCREMENT("fdatasync", 1); + TRACE_COUNTER_SCOPE_LATENCY_US("fdatasync_us"); + if (fdatasync(fd) < 0) { + return IOError(filename, errno); + } + } + return Status::OK(); +} + +Status DoOpen(const string& filename, Env::CreateMode mode, int* fd) { + ThreadRestrictions::AssertIOAllowed(); + int flags = O_RDWR; + switch (mode) { + case Env::CREATE_IF_NON_EXISTING_TRUNCATE: + flags |= O_CREAT | O_TRUNC; + break; + case Env::CREATE_NON_EXISTING: + flags |= O_CREAT | O_EXCL; + break; + case Env::OPEN_EXISTING: + break; + default: + return Status::NotSupported(Substitute("Unknown create mode $0", mode)); + } + const int f = open(filename.c_str(), flags, 0666); + if (f < 0) { + return IOError(filename, errno); + } + *fd = f; + return Status::OK(); +} + +Status DoReadV(int fd, const string& filename, uint64_t offset, vector<Slice>* results) { + ThreadRestrictions::AssertIOAllowed(); + + // Convert the results into the iovec vector to request + // and calculate the total bytes requested + size_t bytes_req = 0; + size_t iov_size = results->size(); + struct iovec iov[iov_size]; + for (size_t i = 0; i < iov_size; i++) { + Slice& result = (*results)[i]; + bytes_req += result.size(); + iov[i] = { result.mutable_data(), result.size() }; + } + + uint64_t cur_offset = offset; + size_t completed_iov = 0; + size_t rem = bytes_req; + while (rem > 0) { + // Never request more than IOV_MAX in one request + size_t iov_count = std::min(iov_size - completed_iov, static_cast<size_t>(IOV_MAX)); + ssize_t r; + RETRY_ON_EINTR(r, preadv(fd, iov + completed_iov, iov_count, cur_offset)); + + // Fake a short read for testing + if (PREDICT_FALSE(FLAGS_env_inject_short_read_bytes > 0 && rem == bytes_req)) { + DCHECK_LT(FLAGS_env_inject_short_read_bytes, r); + r -= FLAGS_env_inject_short_read_bytes; + } + + if (PREDICT_FALSE(r < 0)) { + // An error: return a non-ok status. + return IOError(filename, errno); + } + if (PREDICT_FALSE(r == 0)) { + // EOF. + return Status::IOError( + Substitute("EOF trying to read $0 bytes at offset $1", bytes_req, offset)); + } + if (PREDICT_TRUE(r == rem)) { + // All requested bytes were read. This is almost always the case. + return Status::OK(); + } + DCHECK_LE(r, rem); + // Adjust iovec vector based on bytes read for the next request + ssize_t bytes_rem = r; + for (size_t i = completed_iov; i < iov_size; i++) { + if (bytes_rem >= iov[i].iov_len) { + // The full length of this iovec was read + completed_iov++; + bytes_rem -= iov[i].iov_len; + } else { + // Partially read this result. + // Adjust the iov_len and iov_base to request only the missing data. + iov[i].iov_base = static_cast<uint8_t *>(iov[i].iov_base) + bytes_rem; + iov[i].iov_len -= bytes_rem; + break; // Don't need to adjust remaining iovec's + } + } + cur_offset += r; + rem -= r; + } + DCHECK_EQ(0, rem); + return Status::OK(); +} + +Status DoWriteV(int fd, const string& filename, uint64_t offset, + const vector<Slice>& data) { + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + ThreadRestrictions::AssertIOAllowed(); + + // Convert the results into the iovec vector to request + // and calculate the total bytes requested. + size_t bytes_req = 0; + size_t iov_size = data.size(); + struct iovec iov[iov_size]; + for (size_t i = 0; i < iov_size; i++) { + const Slice& result = data[i]; + bytes_req += result.size(); + iov[i] = { const_cast<uint8_t*>(result.data()), result.size() }; + } + + uint64_t cur_offset = offset; + size_t completed_iov = 0; + size_t rem = bytes_req; + while (rem > 0) { + // Never request more than IOV_MAX in one request. + size_t iov_count = std::min(iov_size - completed_iov, static_cast<size_t>(IOV_MAX)); + ssize_t w; + RETRY_ON_EINTR(w, pwritev(fd, iov + completed_iov, iov_count, cur_offset)); + + // Fake a short write for testing. + if (PREDICT_FALSE(FLAGS_env_inject_short_write_bytes > 0 && rem == bytes_req)) { + DCHECK_LT(FLAGS_env_inject_short_write_bytes, w); + w -= FLAGS_env_inject_short_read_bytes; + } + + if (PREDICT_FALSE(w < 0)) { + // An error: return a non-ok status. + return IOError(filename, errno); + } + + DCHECK_LE(w, rem); + + if (PREDICT_TRUE(w == rem)) { + // All requested bytes were read. This is almost always the case. + return Status::OK(); + } + // Adjust iovec vector based on bytes read for the next request. + ssize_t bytes_rem = w; + for (size_t i = completed_iov; i < iov_size; i++) { + if (bytes_rem >= iov[i].iov_len) { + // The full length of this iovec was written. + completed_iov++; + bytes_rem -= iov[i].iov_len; + } else { + // Partially wrote this result. + // Adjust the iov_len and iov_base to write only the missing data. + iov[i].iov_base = static_cast<uint8_t *>(iov[i].iov_base) + bytes_rem; + iov[i].iov_len -= bytes_rem; + break; // Don't need to adjust remaining iovec's. + } + } + cur_offset += w; + rem -= w; + } + DCHECK_EQ(0, rem); + return Status::OK(); +} + +class PosixSequentialFile: public SequentialFile { + private: + std::string filename_; + FILE* file_; + + public: + PosixSequentialFile(std::string fname, FILE* f) + : filename_(std::move(fname)), file_(f) {} + virtual ~PosixSequentialFile() { fclose(file_); } + + virtual Status Read(Slice* result) OVERRIDE { + ThreadRestrictions::AssertIOAllowed(); + size_t r; + STREAM_RETRY_ON_EINTR(r, file_, fread_unlocked(result->mutable_data(), 1, + result->size(), file_)); + if (r < result->size()) { + if (feof(file_)) { + // We leave status as ok if we hit the end of the file. + // We need to adjust the slice size. + result->truncate(r); + } else { + // A partial read with an error: return a non-ok status. + return IOError(filename_, errno); + } + } + return Status::OK(); + } + + virtual Status Skip(uint64_t n) OVERRIDE { + TRACE_EVENT1("io", "PosixSequentialFile::Skip", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + if (fseek(file_, n, SEEK_CUR)) { + return IOError(filename_, errno); + } + return Status::OK(); + } + + virtual const string& filename() const OVERRIDE { return filename_; } +}; + +// pread() based random-access +class PosixRandomAccessFile: public RandomAccessFile { + private: + std::string filename_; + int fd_; + + public: + PosixRandomAccessFile(std::string fname, int fd) + : filename_(std::move(fname)), fd_(fd) {} + virtual ~PosixRandomAccessFile() { close(fd_); } + + virtual Status Read(uint64_t offset, Slice* result) const OVERRIDE { + vector<Slice> results = { *result }; + return ReadV(offset, &results); + } + + virtual Status ReadV(uint64_t offset, vector<Slice>* results) const OVERRIDE { + return DoReadV(fd_, filename_, offset, results); + } + + virtual Status Size(uint64_t *size) const OVERRIDE { + TRACE_EVENT1("io", "PosixRandomAccessFile::Size", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + struct stat st; + if (fstat(fd_, &st) == -1) { + return IOError(filename_, errno); + } + *size = st.st_size; + return Status::OK(); + } + + virtual const string& filename() const OVERRIDE { return filename_; } + + virtual size_t memory_footprint() const OVERRIDE { + return kudu_malloc_usable_size(this) + filename_.capacity(); + } +}; + +// Use non-memory mapped POSIX files to write data to a file. +// +// TODO (perf) investigate zeroing a pre-allocated allocated area in +// order to further improve Sync() performance. +class PosixWritableFile : public WritableFile { + public: + PosixWritableFile(std::string fname, int fd, uint64_t file_size, + bool sync_on_close) + : filename_(std::move(fname)), + fd_(fd), + sync_on_close_(sync_on_close), + filesize_(file_size), + pre_allocated_size_(0), + pending_sync_(false) {} + + ~PosixWritableFile() { + if (fd_ >= 0) { + WARN_NOT_OK(Close(), "Failed to close " + filename_); + } + } + + virtual Status Append(const Slice& data) OVERRIDE { + return AppendV({ data }); + } + + virtual Status AppendV(const vector<Slice> &data) OVERRIDE { + ThreadRestrictions::AssertIOAllowed(); + RETURN_NOT_OK(DoWriteV(fd_, filename_, filesize_, data)); + // Calculate the amount of data written + size_t bytes_written = accumulate(data.begin(), data.end(), static_cast<size_t>(0), + [&](int sum, const Slice& curr) { + return sum + curr.size(); + }); + filesize_ += bytes_written; + pending_sync_ = true; + return Status::OK(); + } + + virtual Status PreAllocate(uint64_t size) OVERRIDE { + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + TRACE_EVENT1("io", "PosixWritableFile::PreAllocate", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + uint64_t offset = std::max(filesize_, pre_allocated_size_); + if (fallocate(fd_, 0, offset, size) < 0) { + if (errno == EOPNOTSUPP) { + KLOG_FIRST_N(WARNING, 1) << "The filesystem does not support fallocate()."; + } else if (errno == ENOSYS) { + KLOG_FIRST_N(WARNING, 1) << "The kernel does not implement fallocate()."; + } else { + return IOError(filename_, errno); + } + } + pre_allocated_size_ = offset + size; + return Status::OK(); + } + + virtual Status Close() OVERRIDE { + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + TRACE_EVENT1("io", "PosixWritableFile::Close", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + Status s; + + // If we've allocated more space than we used, truncate to the + // actual size of the file and perform Sync(). + if (filesize_ < pre_allocated_size_) { + int ret; + RETRY_ON_EINTR(ret, ftruncate(fd_, filesize_)); + if (ret != 0) { + s = IOError(filename_, errno); + pending_sync_ = true; + } + } + + if (sync_on_close_) { + Status sync_status = Sync(); + if (!sync_status.ok()) { + LOG(ERROR) << "Unable to Sync " << filename_ << ": " << sync_status.ToString(); + if (s.ok()) { + s = sync_status; + } + } + } + + if (close(fd_) < 0) { + if (s.ok()) { + s = IOError(filename_, errno); + } + } + + fd_ = -1; + return s; + } + + virtual Status Flush(FlushMode mode) OVERRIDE { + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + TRACE_EVENT1("io", "PosixWritableFile::Flush", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); +#if defined(__linux__) + int flags = SYNC_FILE_RANGE_WRITE; + if (mode == FLUSH_SYNC) { + flags |= SYNC_FILE_RANGE_WAIT_BEFORE; + flags |= SYNC_FILE_RANGE_WAIT_AFTER; + } + if (sync_file_range(fd_, 0, 0, flags) < 0) { + return IOError(filename_, errno); + } +#else + if (mode == FLUSH_SYNC && fsync(fd_) < 0) { + return IOError(filename_, errno); + } +#endif + return Status::OK(); + } + + virtual Status Sync() OVERRIDE { + TRACE_EVENT1("io", "PosixWritableFile::Sync", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + LOG_SLOW_EXECUTION(WARNING, 1000, Substitute("sync call for $0", filename_)) { + if (pending_sync_) { + pending_sync_ = false; + RETURN_NOT_OK(DoSync(fd_, filename_)); + } + } + return Status::OK(); + } + + virtual uint64_t Size() const OVERRIDE { + return filesize_; + } + + virtual const string& filename() const OVERRIDE { return filename_; } + + private: + const std::string filename_; + int fd_; + bool sync_on_close_; + uint64_t filesize_; + uint64_t pre_allocated_size_; + + bool pending_sync_; +}; + +class PosixRWFile : public RWFile { + public: + PosixRWFile(string fname, int fd, bool sync_on_close) + : filename_(std::move(fname)), + fd_(fd), + sync_on_close_(sync_on_close), + pending_sync_(false), + closed_(false) {} + + ~PosixRWFile() { + WARN_NOT_OK(Close(), "Failed to close " + filename_); + } + + virtual Status Read(uint64_t offset, Slice* result) const OVERRIDE { + vector<Slice> results = { *result }; + return ReadV(offset, &results); + } + + virtual Status ReadV(uint64_t offset, vector<Slice>* results) const OVERRIDE { + return DoReadV(fd_, filename_, offset, results); + } + + virtual Status Write(uint64_t offset, const Slice& data) OVERRIDE { + return WriteV(offset, { data }); + } + + virtual Status WriteV(uint64_t offset, const vector<Slice> &data) OVERRIDE { + Status s = DoWriteV(fd_, filename_, offset, data); + pending_sync_.Store(true); + return s; + } + + virtual Status PreAllocate(uint64_t offset, + size_t length, + PreAllocateMode mode) OVERRIDE { + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + TRACE_EVENT1("io", "PosixRWFile::PreAllocate", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + int falloc_mode = 0; + if (mode == DONT_CHANGE_FILE_SIZE) { + falloc_mode = FALLOC_FL_KEEP_SIZE; + } + if (fallocate(fd_, falloc_mode, offset, length) < 0) { + if (errno == EOPNOTSUPP) { + KLOG_FIRST_N(WARNING, 1) << "The filesystem does not support fallocate()."; + } else if (errno == ENOSYS) { + KLOG_FIRST_N(WARNING, 1) << "The kernel does not implement fallocate()."; + } else { + return IOError(filename_, errno); + } + } + return Status::OK(); + } + + virtual Status Truncate(uint64_t length) OVERRIDE { + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + TRACE_EVENT2("io", "PosixRWFile::Truncate", "path", filename_, "length", length); + ThreadRestrictions::AssertIOAllowed(); + int ret; + RETRY_ON_EINTR(ret, ftruncate(fd_, length)); + if (ret != 0) { + int err = errno; + return Status::IOError(Substitute("Unable to truncate file $0", filename_), + Substitute("ftruncate() failed: $0", ErrnoToString(err)), + err); + } + return Status::OK(); + } + + virtual Status PunchHole(uint64_t offset, size_t length) OVERRIDE { +#if defined(__linux__) + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + TRACE_EVENT1("io", "PosixRWFile::PunchHole", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + if (fallocate(fd_, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, length) < 0) { + return IOError(filename_, errno); + } + return Status::OK(); +#else + return Status::NotSupported("Hole punching not supported on this platform"); +#endif + } + + virtual Status Flush(FlushMode mode, uint64_t offset, size_t length) OVERRIDE { + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + TRACE_EVENT1("io", "PosixRWFile::Flush", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); +#if defined(__linux__) + int flags = SYNC_FILE_RANGE_WRITE; + if (mode == FLUSH_SYNC) { + flags |= SYNC_FILE_RANGE_WAIT_AFTER; + } + if (sync_file_range(fd_, offset, length, flags) < 0) { + return IOError(filename_, errno); + } +#else + if (mode == FLUSH_SYNC && fsync(fd_) < 0) { + return IOError(filename_, errno); + } +#endif + return Status::OK(); + } + + virtual Status Sync() OVERRIDE { + if (!pending_sync_.CompareAndSwap(true, false)) { + return Status::OK(); + } + + TRACE_EVENT1("io", "PosixRWFile::Sync", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + LOG_SLOW_EXECUTION(WARNING, 1000, Substitute("sync call for $0", filename())) { + RETURN_NOT_OK(DoSync(fd_, filename_)); + } + return Status::OK(); + } + + virtual Status Close() OVERRIDE { + if (closed_) { + return Status::OK(); + } + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + TRACE_EVENT1("io", "PosixRWFile::Close", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + Status s; + + if (sync_on_close_) { + s = Sync(); + if (!s.ok()) { + LOG(ERROR) << "Unable to Sync " << filename_ << ": " << s.ToString(); + } + } + + if (close(fd_) < 0) { + if (s.ok()) { + s = IOError(filename_, errno); + } + } + + closed_ = true; + return s; + } + + virtual Status Size(uint64_t* size) const OVERRIDE { + TRACE_EVENT1("io", "PosixRWFile::Size", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + struct stat st; + if (fstat(fd_, &st) == -1) { + return IOError(filename_, errno); + } + *size = st.st_size; + return Status::OK(); + } + + virtual Status GetExtentMap(ExtentMap* out) const OVERRIDE { +#if !defined(__linux__) + return Status::NotSupported("GetExtentMap not supported on this platform"); +#else + TRACE_EVENT1("io", "PosixRWFile::GetExtentMap", "path", filename_); + ThreadRestrictions::AssertIOAllowed(); + + // This allocation size is arbitrary. + static const int kBufSize = 4096; + uint8_t buf[kBufSize] = { 0 }; + + struct fiemap* fm = reinterpret_cast<struct fiemap*>(buf); + struct fiemap_extent* fme = &fm->fm_extents[0]; + int avail_extents_in_buffer = (kBufSize - sizeof(*fm)) / sizeof(*fme); + bool saw_last_extent = false; + ExtentMap extents; + do { + // Fetch another block of extents. + fm->fm_length = FIEMAP_MAX_OFFSET; + fm->fm_extent_count = avail_extents_in_buffer; + if (ioctl(fd_, FS_IOC_FIEMAP, fm) == -1) { + return IOError(filename_, errno); + } + + // No extents returned, this file must have no extents. + if (fm->fm_mapped_extents == 0) { + break; + } + + // Parse the extent block. + uint64_t last_extent_end_offset; + for (int i = 0; i < fm->fm_mapped_extents; i++) { + if (fme[i].fe_flags & FIEMAP_EXTENT_LAST) { + // This should really be the last extent. + CHECK_EQ(fm->fm_mapped_extents - 1, i); + + saw_last_extent = true; + } + InsertOrDie(&extents, fme[i].fe_logical, fme[i].fe_length); + VLOG(3) << Substitute("File $0 extent $1: o $2, l $3 $4", + filename_, i, + fme[i].fe_logical, fme[i].fe_length, + saw_last_extent ? "(final)" : ""); + last_extent_end_offset = fme[i].fe_logical + fme[i].fe_length; + if (saw_last_extent) { + break; + } + } + + fm->fm_start = last_extent_end_offset; + } while (!saw_last_extent); + + out->swap(extents); + return Status::OK(); +#endif + } + + virtual const string& filename() const OVERRIDE { + return filename_; + } + + private: + const std::string filename_; + const int fd_; + const bool sync_on_close_; + + AtomicBool pending_sync_; + bool closed_; +}; + +int LockOrUnlock(int fd, bool lock) { + ThreadRestrictions::AssertIOAllowed(); + errno = 0; + struct flock f; + memset(&f, 0, sizeof(f)); + f.l_type = (lock ? F_WRLCK : F_UNLCK); + f.l_whence = SEEK_SET; + f.l_start = 0; + f.l_len = 0; // Lock/unlock entire file + return fcntl(fd, F_SETLK, &f); +} + +class PosixFileLock : public FileLock { + public: + int fd_; +}; + +class PosixEnv : public Env { + public: + PosixEnv(); + virtual ~PosixEnv() { + fprintf(stderr, "Destroying Env::Default()\n"); + exit(1); + } + + virtual Status NewSequentialFile(const std::string& fname, + unique_ptr<SequentialFile>* result) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::NewSequentialFile", "path", fname); + ThreadRestrictions::AssertIOAllowed(); + FILE* f = fopen(fname.c_str(), "r"); + if (f == nullptr) { + return IOError(fname, errno); + } else { + result->reset(new PosixSequentialFile(fname, f)); + return Status::OK(); + } + } + + virtual Status NewRandomAccessFile(const std::string& fname, + unique_ptr<RandomAccessFile>* result) OVERRIDE { + return NewRandomAccessFile(RandomAccessFileOptions(), fname, result); + } + + virtual Status NewRandomAccessFile(const RandomAccessFileOptions& opts, + const std::string& fname, + unique_ptr<RandomAccessFile>* result) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::NewRandomAccessFile", "path", fname); + ThreadRestrictions::AssertIOAllowed(); + int fd = open(fname.c_str(), O_RDONLY); + if (fd < 0) { + return IOError(fname, errno); + } + + result->reset(new PosixRandomAccessFile(fname, fd)); + return Status::OK(); + } + + virtual Status NewWritableFile(const std::string& fname, + unique_ptr<WritableFile>* result) OVERRIDE { + return NewWritableFile(WritableFileOptions(), fname, result); + } + + virtual Status NewWritableFile(const WritableFileOptions& opts, + const std::string& fname, + unique_ptr<WritableFile>* result) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::NewWritableFile", "path", fname); + int fd; + RETURN_NOT_OK(DoOpen(fname, opts.mode, &fd)); + return InstantiateNewWritableFile(fname, fd, opts, result); + } + + virtual Status NewTempWritableFile(const WritableFileOptions& opts, + const std::string& name_template, + std::string* created_filename, + unique_ptr<WritableFile>* result) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::NewTempWritableFile", "template", name_template); + int fd; + string tmp_filename; + RETURN_NOT_OK(MkTmpFile(name_template, &fd, &tmp_filename)); + RETURN_NOT_OK(InstantiateNewWritableFile(tmp_filename, fd, opts, result)); + created_filename->swap(tmp_filename); + return Status::OK(); + } + + virtual Status NewRWFile(const string& fname, + unique_ptr<RWFile>* result) OVERRIDE { + return NewRWFile(RWFileOptions(), fname, result); + } + + virtual Status NewRWFile(const RWFileOptions& opts, + const string& fname, + unique_ptr<RWFile>* result) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::NewRWFile", "path", fname); + int fd; + RETURN_NOT_OK(DoOpen(fname, opts.mode, &fd)); + result->reset(new PosixRWFile(fname, fd, opts.sync_on_close)); + return Status::OK(); + } + + virtual Status NewTempRWFile(const RWFileOptions& opts, const std::string& name_template, + std::string* created_filename, unique_ptr<RWFile>* res) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::NewTempRWFile", "template", name_template); + int fd; + RETURN_NOT_OK(MkTmpFile(name_template, &fd, created_filename)); + res->reset(new PosixRWFile(*created_filename, fd, opts.sync_on_close)); + return Status::OK(); + } + + virtual bool FileExists(const std::string& fname) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::FileExists", "path", fname); + ThreadRestrictions::AssertIOAllowed(); + return access(fname.c_str(), F_OK) == 0; + } + + virtual Status GetChildren(const std::string& dir, + std::vector<std::string>* result) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::GetChildren", "path", dir); + ThreadRestrictions::AssertIOAllowed(); + result->clear(); + DIR* d = opendir(dir.c_str()); + if (d == nullptr) { + return IOError(dir, errno); + } + struct dirent* entry; + // TODO: lint: Consider using readdir_r(...) instead of readdir(...) for improved thread safety. + while ((entry = readdir(d)) != nullptr) { + result->push_back(entry->d_name); + } + closedir(d); + return Status::OK(); + } + + virtual Status DeleteFile(const std::string& fname) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::DeleteFile", "path", fname); + ThreadRestrictions::AssertIOAllowed(); + Status result; + if (unlink(fname.c_str()) != 0) { + result = IOError(fname, errno); + } + return result; + }; + + virtual Status CreateDir(const std::string& name) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::CreateDir", "path", name); + ThreadRestrictions::AssertIOAllowed(); + Status result; + if (mkdir(name.c_str(), 0777) != 0) { + result = IOError(name, errno); + } + return result; + }; + + virtual Status DeleteDir(const std::string& name) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::DeleteDir", "path", name); + ThreadRestrictions::AssertIOAllowed(); + Status result; + if (rmdir(name.c_str()) != 0) { + result = IOError(name, errno); + } + return result; + }; + + Status GetCurrentWorkingDir(string* cwd) const override { + TRACE_EVENT0("io", "PosixEnv::GetCurrentWorkingDir"); + ThreadRestrictions::AssertIOAllowed(); + unique_ptr<char, FreeDeleter> wd(getcwd(NULL, 0)); + if (!wd) { + return IOError("getcwd()", errno); + } + cwd->assign(wd.get()); + return Status::OK(); + } + + Status ChangeDir(const string& dest) override { + TRACE_EVENT1("io", "PosixEnv::ChangeDir", "dest", dest); + ThreadRestrictions::AssertIOAllowed(); + Status result; + if (chdir(dest.c_str()) != 0) { + result = IOError(dest, errno); + } + return result; + } + + virtual Status SyncDir(const std::string& dirname) OVERRIDE { + TRACE_EVENT1("io", "SyncDir", "path", dirname); + ThreadRestrictions::AssertIOAllowed(); + if (FLAGS_never_fsync) return Status::OK(); + int dir_fd; + if ((dir_fd = open(dirname.c_str(), O_DIRECTORY|O_RDONLY)) == -1) { + return IOError(dirname, errno); + } + ScopedFdCloser fd_closer(dir_fd); + if (fsync(dir_fd) != 0) { + return IOError(dirname, errno); + } + return Status::OK(); + } + + virtual Status DeleteRecursively(const std::string &name) OVERRIDE { + return Walk(name, POST_ORDER, Bind(&PosixEnv::DeleteRecursivelyCb, + Unretained(this))); + } + + virtual Status GetFileSize(const std::string& fname, uint64_t* size) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::GetFileSize", "path", fname); + ThreadRestrictions::AssertIOAllowed(); + Status s; + struct stat sbuf; + if (stat(fname.c_str(), &sbuf) != 0) { + s = IOError(fname, errno); + } else { + *size = sbuf.st_size; + } + return s; + } + + virtual Status GetFileSizeOnDisk(const std::string& fname, uint64_t* size) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::GetFileSizeOnDisk", "path", fname); + ThreadRestrictions::AssertIOAllowed(); + Status s; + struct stat sbuf; + if (stat(fname.c_str(), &sbuf) != 0) { + s = IOError(fname, errno); + } else { + // From stat(2): + // + // The st_blocks field indicates the number of blocks allocated to + // the file, 512-byte units. (This may be smaller than st_size/512 + // when the file has holes.) + *size = sbuf.st_blocks * 512; + } + return s; + } + + virtual Status GetFileSizeOnDiskRecursively(const string& root, + uint64_t* bytes_used) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::GetFileSizeOnDiskRecursively", "path", root); + uint64_t total = 0; + RETURN_NOT_OK(Walk(root, Env::PRE_ORDER, + Bind(&PosixEnv::GetFileSizeOnDiskRecursivelyCb, + Unretained(this), &total))); + *bytes_used = total; + return Status::OK(); + } + + virtual Status GetBlockSize(const string& fname, uint64_t* block_size) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::GetBlockSize", "path", fname); + ThreadRestrictions::AssertIOAllowed(); + Status s; + struct stat sbuf; + if (stat(fname.c_str(), &sbuf) != 0) { + s = IOError(fname, errno); + } else { + *block_size = sbuf.st_blksize; + } + return s; + } + + virtual Status GetFileModifiedTime(const string& fname, int64_t* timestamp) override { + TRACE_EVENT1("io", "PosixEnv::GetFileModifiedTime", "fname", fname); + ThreadRestrictions::AssertIOAllowed(); + + struct stat s; + if (stat(fname.c_str(), &s) != 0) { + return IOError(fname, errno); + } +#ifdef __APPLE__ + *timestamp = s.st_mtimespec.tv_sec * 1e6 + s.st_mtimespec.tv_nsec / 1e3; +#else + *timestamp = s.st_mtim.tv_sec * 1e6 + s.st_mtim.tv_nsec / 1e3; +#endif + return Status::OK(); + } + + // Local convenience function for safely running statvfs(). + static Status StatVfs(const string& path, struct statvfs* buf) { + ThreadRestrictions::AssertIOAllowed(); + int ret; + RETRY_ON_EINTR(ret, statvfs(path.c_str(), buf)); + if (ret == -1) { + return IOError(Substitute("statvfs: $0", path), errno); + } + return Status::OK(); + } + + virtual Status GetSpaceInfo(const string& path, SpaceInfo* space_info) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::GetSpaceInfo", "path", path); + struct statvfs buf; + RETURN_NOT_OK(StatVfs(path, &buf)); + space_info->capacity_bytes = buf.f_frsize * buf.f_blocks; + space_info->free_bytes = buf.f_frsize * buf.f_bavail; + return Status::OK(); + } + + virtual Status RenameFile(const std::string& src, const std::string& target) OVERRIDE { + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + TRACE_EVENT2("io", "PosixEnv::RenameFile", "src", src, "dst", target); + ThreadRestrictions::AssertIOAllowed(); + Status result; + if (rename(src.c_str(), target.c_str()) != 0) { + result = IOError(src, errno); + } + return result; + } + + virtual Status LockFile(const std::string& fname, FileLock** lock) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::LockFile", "path", fname); + ThreadRestrictions::AssertIOAllowed(); + *lock = nullptr; + Status result; + int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0666); + if (fd < 0) { + result = IOError(fname, errno); + } else if (LockOrUnlock(fd, true) == -1) { + result = IOError("lock " + fname, errno); + close(fd); + } else { + auto my_lock = new PosixFileLock; + my_lock->fd_ = fd; + *lock = my_lock; + } + return result; + } + + virtual Status UnlockFile(FileLock* lock) OVERRIDE { + TRACE_EVENT0("io", "PosixEnv::UnlockFile"); + ThreadRestrictions::AssertIOAllowed(); + PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock); + Status result; + if (LockOrUnlock(my_lock->fd_, false) == -1) { + result = IOError("unlock", errno); + } + close(my_lock->fd_); + delete my_lock; + return result; + } + + virtual Status GetTestDirectory(std::string* result) OVERRIDE { + string dir; + const char* env = getenv("TEST_TMPDIR"); + if (env && env[0] != '\0') { + dir = env; + } else { + char buf[100]; + snprintf(buf, sizeof(buf), "/tmp/kudutest-%d", static_cast<int>(geteuid())); + dir = buf; + } + // Directory may already exist + ignore_result(CreateDir(dir)); + // /tmp may be a symlink, so canonicalize the path. + return Canonicalize(dir, result); + } + + virtual uint64_t gettid() OVERRIDE { + // Platform-independent thread ID. We can't use pthread_self here, + // because that function returns a totally opaque ID, which can't be + // compared via normal means. + if (thread_local_id == 0) { + thread_local_id = Barrier_AtomicIncrement(&cur_thread_local_id_, 1); + } + return thread_local_id; + } + + virtual uint64_t NowMicros() OVERRIDE { + struct timeval tv; + gettimeofday(&tv, nullptr); + return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec; + } + + virtual void SleepForMicroseconds(int micros) OVERRIDE { + ThreadRestrictions::AssertWaitAllowed(); + SleepFor(MonoDelta::FromMicroseconds(micros)); + } + + virtual Status GetExecutablePath(string* path) OVERRIDE { + uint32_t size = 64; + uint32_t len = 0; + while (true) { + unique_ptr<char[]> buf(new char[size]); +#if defined(__linux__) + int rc = readlink("/proc/self/exe", buf.get(), size); + if (rc == -1) { + return IOError("Unable to determine own executable path", errno); + } else if (rc >= size) { + // The buffer wasn't large enough + size *= 2; + continue; + } + len = rc; +#elif defined(__APPLE__) + if (_NSGetExecutablePath(buf.get(), &size) != 0) { + // The buffer wasn't large enough; 'size' has been updated. + continue; + } + len = strlen(buf.get()); +#else +#error Unsupported platform +#endif + + path->assign(buf.get(), len); + break; + } + return Status::OK(); + } + + virtual Status IsDirectory(const string& path, bool* is_dir) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::IsDirectory", "path", path); + ThreadRestrictions::AssertIOAllowed(); + Status s; + struct stat sbuf; + if (stat(path.c_str(), &sbuf) != 0) { + s = IOError(path, errno); + } else { + *is_dir = S_ISDIR(sbuf.st_mode); + } + return s; + } + + virtual Status Walk(const string& root, DirectoryOrder order, const WalkCallback& cb) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::Walk", "path", root); + ThreadRestrictions::AssertIOAllowed(); + // Some sanity checks + CHECK_NE(root, "/"); + CHECK_NE(root, "./"); + CHECK_NE(root, "."); + CHECK_NE(root, ""); + + // FTS requires a non-const copy of the name. strdup it and free() when + // we leave scope. + unique_ptr<char, FreeDeleter> name_dup(strdup(root.c_str())); + char *(paths[]) = { name_dup.get(), nullptr }; + + // FTS_NOCHDIR is important here to make this thread-safe. + unique_ptr<FTS, FtsCloser> tree( + fts_open(paths, FTS_PHYSICAL | FTS_XDEV | FTS_NOCHDIR, nullptr)); + if (!tree.get()) { + return IOError(root, errno); + } + + FTSENT *ent = nullptr; + bool had_errors = false; + while ((ent = fts_read(tree.get())) != nullptr) { + bool doCb = false; + FileType type = DIRECTORY_TYPE; + switch (ent->fts_info) { + case FTS_D: // Directory in pre-order + if (order == PRE_ORDER) { + doCb = true; + } + break; + case FTS_DP: // Directory in post-order + if (order == POST_ORDER) { + doCb = true; + } + break; + case FTS_F: // A regular file + case FTS_SL: // A symbolic link + case FTS_SLNONE: // A broken symbolic link + case FTS_DEFAULT: // Unknown type of file + doCb = true; + type = FILE_TYPE; + break; + + case FTS_ERR: + LOG(WARNING) << "Unable to access file " << ent->fts_path + << " during walk: " << strerror(ent->fts_errno); + had_errors = true; + break; + + default: + LOG(WARNING) << "Unable to access file " << ent->fts_path + << " during walk (code " << ent->fts_info << ")"; + break; + } + if (doCb) { + if (!cb.Run(type, DirName(ent->fts_path), ent->fts_name).ok()) { + had_errors = true; + } + } + } + + if (had_errors) { + return Status::IOError(root, "One or more errors occurred"); + } + return Status::OK(); + } + + Status Glob(const string& path_pattern, vector<string>* paths) override { + TRACE_EVENT1("io", "PosixEnv::Glob", "path_pattern", path_pattern); + ThreadRestrictions::AssertIOAllowed(); + + glob_t result; + auto cleanup = MakeScopedCleanup([&] { globfree(&result); }); + + int ret = glob(path_pattern.c_str(), GLOB_TILDE | GLOB_ERR , NULL, &result); + switch (ret) { + case 0: break; + case GLOB_NOMATCH: return Status::OK(); + case GLOB_NOSPACE: return Status::RuntimeError("glob out of memory"); + default: return Status::IOError("glob failure", std::to_string(ret)); + } + + for (size_t i = 0; i < result.gl_pathc; ++i) { + paths->emplace_back(result.gl_pathv[i]); + } + return Status::OK(); + } + + virtual Status Canonicalize(const string& path, string* result) OVERRIDE { + TRACE_EVENT1("io", "PosixEnv::Canonicalize", "path", path); + ThreadRestrictions::AssertIOAllowed(); + unique_ptr<char[], FreeDeleter> r(realpath(path.c_str(), nullptr)); + if (!r) { + return IOError(Substitute("Unable to canonicalize $0", path), errno); + } + *result = string(r.get()); + return Status::OK(); + } + + virtual Status GetTotalRAMBytes(int64_t* ram) OVERRIDE { +#if defined(__APPLE__) + int mib[2]; + size_t length = sizeof(*ram); + + // Get the Physical memory size + mib[0] = CTL_HW; + mib[1] = HW_MEMSIZE; + CHECK_ERR(sysctl(mib, 2, ram, &length, nullptr, 0)) << "sysctl CTL_HW HW_MEMSIZE failed"; +#else + struct sysinfo info; + if (sysinfo(&info) < 0) { + return IOError("sysinfo() failed", errno); + } + *ram = info.totalram; +#endif + return Status::OK(); + } + + virtual int64_t GetOpenFileLimit() OVERRIDE { + // There's no reason for this to ever fail. + struct rlimit l; + PCHECK(getrlimit(RLIMIT_NOFILE, &l) == 0); + return l.rlim_cur; + } + + virtual void IncreaseOpenFileLimit() OVERRIDE { + // There's no reason for this to ever fail; any process should have + // sufficient privilege to increase its soft limit up to the hard limit. + // + // This change is logged because it is process-wide. + struct rlimit l; + PCHECK(getrlimit(RLIMIT_NOFILE, &l) == 0); +#if defined(__APPLE__) + // OS X 10.11 can return RLIM_INFINITY from getrlimit, but allows rlim_cur and + // rlim_max to be raised only as high as the value of the maxfilesperproc + // kernel variable. Emperically, this value is 10240 across all tested macOS + // versions. Testing on OS X 10.10 and macOS 10.12 revealed that getrlimit + // returns the true limits (not RLIM_INFINITY), rlim_max can *not* be raised + // (when running as non-root), and rlim_cur can only be raised as high as + // rlim_max (this is consistent with Linux). + // TLDR; OS X 10.11 is wack. + if (l.rlim_max == RLIM_INFINITY) { + uint32_t limit; + size_t len = sizeof(limit); + PCHECK(sysctlbyname("kern.maxfilesperproc", &limit, &len, nullptr, 0) == 0); + // Make sure no uninitialized bits are present in the result. + DCHECK_EQ(sizeof(limit), len); + l.rlim_max = limit; + } +#endif + if (l.rlim_cur < l.rlim_max) { + LOG(INFO) << Substitute("Raising process file limit from $0 to $1", + l.rlim_cur, l.rlim_max); + l.rlim_cur = l.rlim_max; + PCHECK(setrlimit(RLIMIT_NOFILE, &l) == 0); + } else { + LOG(INFO) << Substitute("Not raising process file limit of $0; it is " + "already as high as it can go", l.rlim_cur); + } + } + + virtual Status IsOnExtFilesystem(const string& path, bool* result) OVERRIDE { + TRACE_EVENT0("io", "PosixEnv::IsOnExtFilesystem"); + ThreadRestrictions::AssertIOAllowed(); + +#ifdef __APPLE__ + *result = false; +#else + struct statfs buf; + int ret; + RETRY_ON_EINTR(ret, statfs(path.c_str(), &buf)); + if (ret == -1) { + return IOError(Substitute("statfs: $0", path), errno); + } + *result = (buf.f_type == EXT4_SUPER_MAGIC); +#endif + return Status::OK(); + } + + virtual string GetKernelRelease() OVERRIDE { + // There's no reason for this to ever fail. + struct utsname u; + PCHECK(uname(&u) == 0); + return string(u.release); + } + + Status EnsureFileModeAdheresToUmask(const string& path) override { + struct stat s; + if (stat(path.c_str(), &s) != 0) { + return IOError("stat", errno); + } + CHECK_NE(g_parsed_umask, -1); + if (s.st_mode & g_parsed_umask) { + uint32_t old_perms = s.st_mode & ACCESSPERMS; + uint32_t new_perms = old_perms & ~g_parsed_umask; + LOG(WARNING) << "Path " << path << " has permissions " + << StringPrintf("%03o", old_perms) + << " which are less restrictive than current umask value " + << StringPrintf("%03o", g_parsed_umask) + << ": resetting permissions to " + << StringPrintf("%03o", new_perms); + if (chmod(path.c_str(), new_perms) != 0) { + return IOError("chmod", errno); + } + } + return Status::OK(); + } + + private: + // unique_ptr Deleter implementation for fts_close + struct FtsCloser { + void operator()(FTS *fts) const { + if (fts) { fts_close(fts); } + } + }; + + Status MkTmpFile(const string& name_template, int* fd, string* created_filename) { + MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error, + Status::IOError(Env::kInjectedFailureStatusMsg)); + ThreadRestrictions::AssertIOAllowed(); + unique_ptr<char[]> fname(new char[name_template.size() + 1]); + ::snprintf(fname.get(), name_template.size() + 1, "%s", name_template.c_str()); + int created_fd = mkstemp(fname.get()); + if (created_fd < 0) { + return IOError(Substitute("Call to mkstemp() failed on name template $0", name_template), + errno); + } + // mkstemp defaults to making files with permissions 0600. But, if the + // user configured a more permissive umask, then we ensure that the + // resulting file gets the desired (wider) permissions. + uint32_t new_perms = 0666 & ~g_parsed_umask; + if (new_perms != 0600) { + CHECK_ERR(fchmod(created_fd, new_perms)); + } + *fd = created_fd; + *created_filename = fname.get(); + return Status::OK(); + } + + Status InstantiateNewWritableFile(const std::string& fname, + int fd, + const WritableFileOptions& opts, + unique_ptr<WritableFile>* result) { + uint64_t file_size = 0; + if (opts.mode == OPEN_EXISTING) { + RETURN_NOT_OK(GetFileSize(fname, &file_size)); + } + result->reset(new PosixWritableFile(fname, fd, file_size, opts.sync_on_close)); + return Status::OK(); + } + + Status DeleteRecursivelyCb(FileType type, const string& dirname, const string& basename) { + string full_path = JoinPathSegments(dirname, basename); + Status s; + switch (type) { + case FILE_TYPE: + s = DeleteFile(full_path); + WARN_NOT_OK(s, "Could not delete file"); + return s; + case DIRECTORY_TYPE: + s = DeleteDir(full_path); + WARN_NOT_OK(s, "Could not delete directory"); + return s; + default: + LOG(FATAL) << "Unknown file type: " << type; + return Status::OK(); + } + } + + Status GetFileSizeOnDiskRecursivelyCb(uint64_t* bytes_used, + Env::FileType type, + const string& dirname, + const string& basename) { + uint64_t file_bytes_used = 0; + switch (type) { + case Env::FILE_TYPE: + RETURN_NOT_OK(GetFileSizeOnDisk( + JoinPathSegments(dirname, basename), &file_bytes_used)); + *bytes_used += file_bytes_used; + break; + case Env::DIRECTORY_TYPE: + // Ignore directory space consumption as it varies from filesystem to + // filesystem. + break; + default: + LOG(FATAL) << "Unknown file type: " << type; + } + return Status::OK(); + } +}; + +PosixEnv::PosixEnv() {} + +} // namespace + +static pthread_once_t once = PTHREAD_ONCE_INIT; +static Env* default_env; +static void InitDefaultEnv() { default_env = new PosixEnv; } + +Env* Env::Default() { + pthread_once(&once, InitDefaultEnv); + return default_env; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env_util-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/env_util-test.cc b/be/src/kudu/util/env_util-test.cc new file mode 100644 index 0000000..78bb006 --- /dev/null +++ b/be/src/kudu/util/env_util-test.cc @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <sys/statvfs.h> +#include <sys/time.h> +#include <unistd.h> + +#include <algorithm> +#include <memory> +#include <unordered_set> + +#include <gflags/gflags.h> +#include <glog/stl_logging.h> + +#include "kudu/gutil/map-util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/env_util.h" +#include "kudu/util/path_util.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" + +DECLARE_int64(disk_reserved_bytes_free_for_testing); + +using std::string; +using std::unique_ptr; +using std::unordered_set; +using strings::Substitute; + +namespace kudu { +namespace env_util { + +class EnvUtilTest: public KuduTest { +}; + +// Assert that Status 's' indicates there is not enough space left on the +// device for the request. +static void AssertNoSpace(const Status& s) { + ASSERT_TRUE(s.IsIOError()); + ASSERT_EQ(ENOSPC, s.posix_code()); + ASSERT_STR_CONTAINS(s.ToString(), "Insufficient disk space"); +} + +TEST_F(EnvUtilTest, TestDiskSpaceCheck) { + const int64_t kZeroRequestedBytes = 0; + const int64_t kRequestOnePercentReservation = -1; + int64_t reserved_bytes = 0; + ASSERT_OK(VerifySufficientDiskSpace(env_, test_dir_, kZeroRequestedBytes, reserved_bytes)); + + // Check 1% reservation logic. We loop this in case there are other FS + // operations happening concurrent with this test. + ASSERT_EVENTUALLY([&] { + SpaceInfo space_info; + ASSERT_OK(env_->GetSpaceInfo(test_dir_, &space_info)); + // Try for 1 less byte than 1% free. This request should be rejected. + int64_t target_free_bytes = (space_info.capacity_bytes / 100) - 1; + int64_t bytes_to_request = std::max<int64_t>(0, space_info.free_bytes - target_free_bytes); + NO_FATALS(AssertNoSpace(VerifySufficientDiskSpace(env_, test_dir_, bytes_to_request, + kRequestOnePercentReservation))); + }); + + // Make it seem as if the disk is full and specify that we should have + // reserved 200 bytes. Even asking for 0 bytes should return an error + // indicating we are out of space. + FLAGS_disk_reserved_bytes_free_for_testing = 0; + reserved_bytes = 200; + NO_FATALS(AssertNoSpace(VerifySufficientDiskSpace(env_, test_dir_, kZeroRequestedBytes, + reserved_bytes))); +} + +// Ensure that we can recursively create directories using both absolute and +// relative paths. +TEST_F(EnvUtilTest, TestCreateDirsRecursively) { + // Absolute path. + string path = JoinPathSegments(test_dir_, "a/b/c"); + ASSERT_OK(CreateDirsRecursively(env_, path)); + bool is_dir; + ASSERT_OK(env_->IsDirectory(path, &is_dir)); + ASSERT_TRUE(is_dir); + + // Repeating the previous command should also succeed (it should be a no-op). + ASSERT_OK(CreateDirsRecursively(env_, path)); + ASSERT_OK(env_->IsDirectory(path, &is_dir)); + ASSERT_TRUE(is_dir); + + // Relative path. + ASSERT_OK(env_->ChangeDir(test_dir_)); // Change to test dir to keep CWD clean. + string rel_base = Substitute("$0-$1", CURRENT_TEST_CASE_NAME(), CURRENT_TEST_NAME()); + ASSERT_FALSE(env_->FileExists(rel_base)); + path = JoinPathSegments(rel_base, "x/y/z"); + ASSERT_OK(CreateDirsRecursively(env_, path)); + ASSERT_OK(env_->IsDirectory(path, &is_dir)); + ASSERT_TRUE(is_dir); + + // Directory creation should fail if a file is a part of the path. + path = JoinPathSegments(test_dir_, "x/y/z"); + string file_path = JoinPathSegments(test_dir_, "x"); // Conflicts with 'path'. + ASSERT_FALSE(env_->FileExists(path)); + ASSERT_FALSE(env_->FileExists(file_path)); + // Create an empty file in the path. + unique_ptr<WritableFile> out; + ASSERT_OK(env_->NewWritableFile(file_path, &out)); + ASSERT_OK(out->Close()); + ASSERT_TRUE(env_->FileExists(file_path)); + // Fail. + Status s = CreateDirsRecursively(env_, path); + ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "File exists"); + + // We should be able to create a directory tree even when a symlink exists as + // part of the path. + path = JoinPathSegments(test_dir_, "link/a/b"); + string link_path = JoinPathSegments(test_dir_, "link"); + string real_dir = JoinPathSegments(test_dir_, "real_dir"); + ASSERT_OK(env_->CreateDir(real_dir)); + PCHECK(symlink(real_dir.c_str(), link_path.c_str()) == 0); + ASSERT_OK(CreateDirsRecursively(env_, path)); + ASSERT_OK(env_->IsDirectory(path, &is_dir)); + ASSERT_TRUE(is_dir); +} + +// Ensure that DeleteExcessFilesByPattern() works. +// We ensure that the number of files remaining after running it is the number +// expected, and we manually set the modification times on the relevant files +// to allow us to test that files are deleted oldest-first. +TEST_F(EnvUtilTest, TestDeleteExcessFilesByPattern) { + string dir = JoinPathSegments(test_dir_, "excess"); + ASSERT_OK(env_->CreateDir(dir)); + vector<string> filenames = {"a", "b", "c", "d"}; + int now_sec = GetCurrentTimeMicros() / 1000; + for (int i = 0; i < filenames.size(); i++) { + const string& filename = filenames[i]; + string path = JoinPathSegments(dir, filename); + unique_ptr<WritableFile> file; + ASSERT_OK(env_->NewWritableFile(path, &file)); + ASSERT_OK(file->Close()); + + // Set the last-modified time of the file. + struct timeval target_time { .tv_sec = now_sec + (i * 2), .tv_usec = 0 }; + struct timeval times[2] = { target_time, target_time }; + ASSERT_EQ(0, utimes(path.c_str(), times)) << errno; + } + vector<string> children; + ASSERT_OK(env_->GetChildren(dir, &children)); + ASSERT_EQ(6, children.size()); // 4 files plus "." and "..". + ASSERT_OK(DeleteExcessFilesByPattern(env_, dir + "/*", 2)); + ASSERT_OK(env_->GetChildren(dir, &children)); + ASSERT_EQ(4, children.size()); // 2 files plus "." and "..". + unordered_set<string> children_set(children.begin(), children.end()); + unordered_set<string> expected_set({".", "..", "c", "d"}); + ASSERT_EQ(expected_set, children_set) << children; +} + +} // namespace env_util +} // namespace kudu
