http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/env-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/env-test.cc b/be/src/kudu/util/env-test.cc
new file mode 100644
index 0000000..1c7f899
--- /dev/null
+++ b/be/src/kudu/util/env-test.cc
@@ -0,0 +1,1173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#if !defined(__APPLE__)
+#include <linux/falloc.h>
+#endif  // !defined(__APPLE__)
+// Copied from falloc.h. Useful for older kernels that lack support for
+// hole punching; fallocate(2) will return EOPNOTSUPP.
+#ifndef FALLOC_FL_KEEP_SIZE
+#define FALLOC_FL_KEEP_SIZE 0x01 /* default is extend size */
+#endif
+#ifndef FALLOC_FL_PUNCH_HOLE
+#define FALLOC_FL_PUNCH_HOLE  0x02 /* de-allocates range */
+#endif
+
+#include "kudu/util/env.h"
+
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include <cerrno>
+#include <cstdint>
+#include <cstdio>
+#include <cstdlib>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <glog/stl_logging.h> // IWYU pragma: keep
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/array_view.h" // IWYU pragma: keep
+#include "kudu/util/env_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(never_fsync);
+DECLARE_bool(crash_on_eio);
+DECLARE_double(env_inject_eio);
+DECLARE_int32(env_inject_short_read_bytes);
+DECLARE_int32(env_inject_short_write_bytes);
+DECLARE_string(env_inject_eio_globs);
+
+namespace kudu {
+
+using std::pair;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+static const uint64_t kOneMb = 1024 * 1024;
+static const uint64_t kTwoMb = 2 * kOneMb;
+
+class TestEnv : public KuduTest {
+ public:
+  virtual void SetUp() OVERRIDE {
+    KuduTest::SetUp();
+    CheckFallocateSupport();
+  }
+
+  // Verify that fallocate() is supported in the test directory.
+  // Some local file systems like ext3 do not support it, and we don't
+  // want to fail tests on those systems.
+  //
+  // Sets fallocate_supported_ based on the result.
+  void CheckFallocateSupport() {
+    static bool checked = false;
+    if (checked) return;
+
+#if defined(__linux__)
+    int fd;
+    RETRY_ON_EINTR(fd, creat(GetTestPath("check-fallocate").c_str(), S_IWUSR));
+    CHECK_ERR(fd);
+    int err;
+    RETRY_ON_EINTR(err, fallocate(fd, 0, 0, 4096));
+    if (err != 0) {
+      PCHECK(errno == ENOTSUP);
+    } else {
+      fallocate_supported_ = true;
+
+      RETRY_ON_EINTR(err, fallocate(fd, FALLOC_FL_KEEP_SIZE | 
FALLOC_FL_PUNCH_HOLE,
+                                    1024, 1024));
+      if (err != 0) {
+        PCHECK(errno == ENOTSUP);
+      } else {
+        fallocate_punch_hole_supported_ = true;
+      }
+    }
+
+    RETRY_ON_EINTR(err, close(fd));
+#endif
+
+    checked = true;
+  }
+
+ protected:
+
+  void VerifyTestData(const Slice& read_data, size_t offset) {
+    for (int i = 0; i < read_data.size(); i++) {
+      size_t file_offset = offset + i;
+      ASSERT_EQ((file_offset * 31) & 0xff, read_data[i]) << "failed at " << i;
+    }
+  }
+
+  void MakeVectors(int num_slices, int slice_size, int num_iterations,
+                   unique_ptr<faststring[]>* data, vector<vector<Slice > >* 
vec) {
+    data->reset(new faststring[num_iterations * num_slices]);
+    vec->resize(num_iterations);
+
+    int data_idx = 0;
+    int byte_idx = 0;
+    for (int vec_idx = 0; vec_idx < num_iterations; vec_idx++) {
+      vector<Slice>& iter_vec = vec->at(vec_idx);
+      iter_vec.resize(num_slices);
+      for (int i = 0; i < num_slices; i++) {
+        (*data)[data_idx].resize(slice_size);
+        for (int j = 0; j < slice_size; j++) {
+          (*data)[data_idx][j] = (byte_idx * 31) & 0xff;
+          ++byte_idx;
+        }
+        iter_vec[i]= Slice((*data)[data_idx]);
+        ++data_idx;
+      }
+    }
+  }
+
+  void ReadAndVerifyTestData(RandomAccessFile* raf, size_t offset, size_t n) {
+    unique_ptr<uint8_t[]> scratch(new uint8_t[n]);
+    Slice s(scratch.get(), n);
+    ASSERT_OK(raf->Read(offset, s));
+    ASSERT_NO_FATAL_FAILURE(VerifyTestData(s, offset));
+  }
+
+  void TestAppendV(size_t num_slices, size_t slice_size, size_t iterations,
+                   bool fast, bool pre_allocate,
+                   const WritableFileOptions &opts) {
+    const string kTestPath = GetTestPath("test_env_appendvec_read_append");
+    shared_ptr<WritableFile> file;
+    ASSERT_OK(env_util::OpenFileForWrite(opts, env_, kTestPath, &file));
+
+    if (pre_allocate) {
+      ASSERT_OK(file->PreAllocate(num_slices * slice_size * iterations));
+      ASSERT_OK(file->Sync());
+    }
+
+    unique_ptr<faststring[]> data;
+    vector<vector<Slice> > input;
+
+    MakeVectors(num_slices, slice_size, iterations, &data, &input);
+
+    // Force short writes to half the slice length.
+    FLAGS_env_inject_short_write_bytes = slice_size / 2;
+
+    shared_ptr<RandomAccessFile> raf;
+
+    if (!fast) {
+      ASSERT_OK(env_util::OpenFileForRandom(env_, kTestPath, &raf));
+    }
+
+    srand(123);
+
+    const string test_descr = Substitute(
+        "appending a vector of slices(number of slices=$0,size of slice=$1 b) 
$2 times",
+        num_slices, slice_size, iterations);
+    LOG_TIMING(INFO, test_descr)  {
+      for (int i = 0; i < iterations; i++) {
+        if (fast || random() % 2) {
+          ASSERT_OK(file->AppendV(input[i]));
+        } else {
+          for (const Slice& slice : input[i]) {
+            ASSERT_OK(file->Append(slice));
+          }
+        }
+        if (!fast) {
+          // Verify as write. Note: this requires that file is pre-allocated, 
otherwise
+          // the Read() fails with EINVAL.
+          ASSERT_NO_FATAL_FAILURE(ReadAndVerifyTestData(raf.get(), num_slices 
* slice_size * i,
+                                                        num_slices * 
slice_size));
+        }
+      }
+    }
+
+    // Verify the entire file
+    ASSERT_OK(file->Close());
+
+    if (fast) {
+      ASSERT_OK(env_util::OpenFileForRandom(env_, kTestPath, &raf));
+    }
+    for (int i = 0; i < iterations; i++) {
+      ASSERT_NO_FATAL_FAILURE(ReadAndVerifyTestData(raf.get(), num_slices * 
slice_size * i,
+                                                    num_slices * slice_size));
+    }
+  }
+
+  static bool fallocate_supported_;
+  static bool fallocate_punch_hole_supported_;
+};
+
+bool TestEnv::fallocate_supported_ = false;
+bool TestEnv::fallocate_punch_hole_supported_ = false;
+
+TEST_F(TestEnv, TestPreallocate) {
+  if (!fallocate_supported_) {
+    LOG(INFO) << "fallocate not supported, skipping test";
+    return;
+  }
+  LOG(INFO) << "Testing PreAllocate()";
+  string test_path = GetTestPath("test_env_wf");
+  shared_ptr<WritableFile> file;
+  ASSERT_OK(env_util::OpenFileForWrite(env_, test_path, &file));
+
+  // pre-allocate 1 MB
+  ASSERT_OK(file->PreAllocate(kOneMb));
+  ASSERT_OK(file->Sync());
+
+  // the writable file size should report 0
+  ASSERT_EQ(file->Size(), 0);
+  // but the real size of the file on disk should report 1MB
+  uint64_t size;
+  ASSERT_OK(env_->GetFileSize(test_path, &size));
+  ASSERT_EQ(size, kOneMb);
+
+  // write 1 MB
+  uint8_t scratch[kOneMb];
+  Slice slice(scratch, kOneMb);
+  ASSERT_OK(file->Append(slice));
+  ASSERT_OK(file->Sync());
+
+  // the writable file size should now report 1 MB
+  ASSERT_EQ(file->Size(), kOneMb);
+  ASSERT_OK(file->Close());
+  // and the real size for the file on disk should match ony the
+  // written size
+  ASSERT_OK(env_->GetFileSize(test_path, &size));
+  ASSERT_EQ(kOneMb, size);
+}
+
+// To test consecutive pre-allocations we need higher pre-allocations since the
+// mmapped regions grow in size until 2MBs (so smaller pre-allocations will 
easily
+// be smaller than the mmapped regions size).
+TEST_F(TestEnv, TestConsecutivePreallocate) {
+  if (!fallocate_supported_) {
+    LOG(INFO) << "fallocate not supported, skipping test";
+    return;
+  }
+  LOG(INFO) << "Testing consecutive PreAllocate()";
+  string test_path = GetTestPath("test_env_wf");
+  shared_ptr<WritableFile> file;
+  ASSERT_OK(env_util::OpenFileForWrite(env_, test_path, &file));
+
+  // pre-allocate 64 MB
+  ASSERT_OK(file->PreAllocate(64 * kOneMb));
+  ASSERT_OK(file->Sync());
+
+  // the writable file size should report 0
+  ASSERT_EQ(file->Size(), 0);
+  // but the real size of the file on disk should report 64 MBs
+  uint64_t size;
+  ASSERT_OK(env_->GetFileSize(test_path, &size));
+  ASSERT_EQ(size, 64 * kOneMb);
+
+  // write 1 MB
+  uint8_t scratch[kOneMb];
+  Slice slice(scratch, kOneMb);
+  ASSERT_OK(file->Append(slice));
+  ASSERT_OK(file->Sync());
+
+  // the writable file size should now report 1 MB
+  ASSERT_EQ(kOneMb, file->Size());
+  ASSERT_OK(env_->GetFileSize(test_path, &size));
+  ASSERT_EQ(64 * kOneMb, size);
+
+  // pre-allocate 64 additional MBs
+  ASSERT_OK(file->PreAllocate(64 * kOneMb));
+  ASSERT_OK(file->Sync());
+
+  // the writable file size should now report 1 MB
+  ASSERT_EQ(kOneMb, file->Size());
+  // while the real file size should report 128 MB's
+  ASSERT_OK(env_->GetFileSize(test_path, &size));
+  ASSERT_EQ(128 * kOneMb, size);
+
+  // write another MB
+  ASSERT_OK(file->Append(slice));
+  ASSERT_OK(file->Sync());
+
+  // the writable file size should now report 2 MB
+  ASSERT_EQ(file->Size(), 2 * kOneMb);
+  // while the real file size should reamin at 128 MBs
+  ASSERT_OK(env_->GetFileSize(test_path, &size));
+  ASSERT_EQ(128 * kOneMb, size);
+
+  // close the file (which ftruncates it to the real size)
+  ASSERT_OK(file->Close());
+  // and the real size for the file on disk should match only the written size
+  ASSERT_OK(env_->GetFileSize(test_path, &size));
+  ASSERT_EQ(2* kOneMb, size);
+
+}
+
+TEST_F(TestEnv, TestHolePunch) {
+  if (!fallocate_punch_hole_supported_) {
+    LOG(INFO) << "hole punching not supported, skipping test";
+    return;
+  }
+  string test_path = GetTestPath("test_env_wf");
+  unique_ptr<RWFile> file;
+  ASSERT_OK(env_->NewRWFile(test_path, &file));
+
+  // Write 1 MB. The size and size-on-disk both agree.
+  uint8_t scratch[kOneMb];
+  Slice slice(scratch, kOneMb);
+  ASSERT_OK(file->Write(0, slice));
+  ASSERT_OK(file->Sync());
+  uint64_t sz;
+  ASSERT_OK(file->Size(&sz));
+  ASSERT_EQ(kOneMb, sz);
+  uint64_t size_on_disk;
+  ASSERT_OK(env_->GetFileSizeOnDisk(test_path, &size_on_disk));
+  // Some kernels and filesystems (e.g. Centos 6.6 with XFS) aggressively
+  // preallocate file disk space when writing to files, so the disk space may 
be
+  // greater than 1MiB.
+  ASSERT_LE(kOneMb, size_on_disk);
+
+  // Punch some data out at byte marker 4096. Now the two sizes diverge.
+  uint64_t punch_amount = 4096 * 4;
+  uint64_t new_size_on_disk;
+  ASSERT_OK(file->PunchHole(4096, punch_amount));
+  ASSERT_OK(file->Size(&sz));
+  ASSERT_EQ(kOneMb, sz);
+  ASSERT_OK(env_->GetFileSizeOnDisk(test_path, &new_size_on_disk));
+  ASSERT_EQ(size_on_disk - punch_amount, new_size_on_disk);
+}
+
+TEST_F(TestEnv, TestHolePunchBenchmark) {
+  const int kFileSize = 1 * 1024 * 1024 * 1024;
+  const int kHoleSize = 10 * kOneMb;
+  const int kNumRuns = 1000;
+  if (!fallocate_punch_hole_supported_) {
+    LOG(INFO) << "hole punching not supported, skipping test";
+    return;
+  }
+  Random r(SeedRandom());
+
+  string test_path = GetTestPath("test");
+  unique_ptr<RWFile> file;
+  ASSERT_OK(env_->NewRWFile(test_path, &file));
+
+  // Initialize a scratch buffer with random data.
+  uint8_t scratch[kOneMb];
+  RandomString(&scratch, kOneMb, &r);
+
+  // Fill the file with sequences of the random data.
+  LOG_TIMING(INFO, Substitute("writing $0 bytes to file", kFileSize)) {
+    Slice slice(scratch, kOneMb);
+    for (int i = 0; i < kFileSize; i += kOneMb) {
+      ASSERT_OK(file->Write(i, slice));
+    }
+  }
+  LOG_TIMING(INFO, "syncing file") {
+    ASSERT_OK(file->Sync());
+  }
+
+  // Punch the first hole.
+  LOG_TIMING(INFO, Substitute("punching first hole of size $0", kHoleSize)) {
+    ASSERT_OK(file->PunchHole(0, kHoleSize));
+  }
+  LOG_TIMING(INFO, "syncing file") {
+    ASSERT_OK(file->Sync());
+  }
+
+  // Run the benchmark.
+  LOG_TIMING(INFO, Substitute("repunching $0 holes of size $1",
+                              kNumRuns, kHoleSize)) {
+    for (int i = 0; i < kNumRuns; i++) {
+      ASSERT_OK(file->PunchHole(0, kHoleSize));
+    }
+  }
+}
+
+TEST_F(TestEnv, TestTruncate) {
+  LOG(INFO) << "Testing Truncate()";
+  string test_path = GetTestPath("test_env_wf");
+  unique_ptr<RWFile> file;
+  ASSERT_OK(env_->NewRWFile(test_path, &file));
+  uint64_t size;
+  ASSERT_OK(file->Size(&size));
+  ASSERT_EQ(0, size);
+
+  // Truncate to 2 MB (up).
+  ASSERT_OK(file->Truncate(kTwoMb));
+  ASSERT_OK(file->Size(&size));
+  ASSERT_EQ(kTwoMb, size);
+  ASSERT_OK(env_->GetFileSize(test_path, &size));
+  ASSERT_EQ(kTwoMb, size);
+
+  // Truncate to 1 MB (down).
+  ASSERT_OK(file->Truncate(kOneMb));
+  ASSERT_OK(file->Size(&size));
+  ASSERT_EQ(kOneMb, size);
+  ASSERT_OK(env_->GetFileSize(test_path, &size));
+  ASSERT_EQ(kOneMb, size);
+
+  ASSERT_OK(file->Close());
+
+  // Read the whole file. Ensure it is all zeroes.
+  unique_ptr<RandomAccessFile> raf;
+  ASSERT_OK(env_->NewRandomAccessFile(test_path, &raf));
+  unique_ptr<uint8_t[]> scratch(new uint8_t[size]);
+  Slice s(scratch.get(), size);
+  ASSERT_OK(raf->Read(0, s));
+  const uint8_t* data = s.data();
+  for (int i = 0; i < size; i++) {
+    ASSERT_EQ(0, data[i]) << "Not null at position " << i;
+  }
+}
+
+// Write 'size' bytes of data to a file, with a simple pattern stored in it.
+static void WriteTestFile(Env* env, const string& path, size_t size) {
+  shared_ptr<WritableFile> wf;
+  ASSERT_OK(env_util::OpenFileForWrite(env, path, &wf));
+  faststring data;
+  data.resize(size);
+  for (int i = 0; i < data.size(); i++) {
+    data[i] = (i * 31) & 0xff;
+  }
+  ASSERT_OK(wf->Append(Slice(data)));
+  ASSERT_OK(wf->Close());
+}
+
+TEST_F(TestEnv, TestReadFully) {
+  SeedRandom();
+  const string kTestPath = GetTestPath("test");
+  const int kFileSize = 64 * 1024;
+  Env* env = Env::Default();
+
+  WriteTestFile(env, kTestPath, kFileSize);
+  ASSERT_NO_FATAL_FAILURE();
+
+  // Reopen for read
+  shared_ptr<RandomAccessFile> raf;
+  ASSERT_OK(env_util::OpenFileForRandom(env, kTestPath, &raf));
+
+  const int kReadLength = 10000;
+  unique_ptr<uint8_t[]> scratch(new uint8_t[kReadLength]);
+  Slice s(scratch.get(), kReadLength);
+
+  // Force a short read to half the data length
+  FLAGS_env_inject_short_read_bytes = kReadLength / 2;
+
+  // Verify that Read fully reads the whole requested data.
+  ASSERT_OK(raf->Read(0, s));
+  VerifyTestData(s, 0);
+
+  // Turn short reads off again
+  FLAGS_env_inject_short_read_bytes = 0;
+
+  // Verify that Read fails with an EndOfFile error EOF.
+  Slice s2(scratch.get(), 200);
+  Status status = raf->Read(kFileSize - 100, s2);
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.IsEndOfFile());
+  ASSERT_STR_CONTAINS(status.ToString(), "EOF");
+}
+
+TEST_F(TestEnv, TestReadVFully) {
+  // Create the file.
+  unique_ptr<RWFile> file;
+  ASSERT_OK(env_->NewRWFile(GetTestPath("foo"), &file));
+
+  // Append to it.
+  string kTestData = "abcde12345";
+  ASSERT_OK(file->Write(0, kTestData));
+
+  // Setup read parameters
+  size_t size1 = 5;
+  uint8_t scratch1[size1];
+  Slice result1(scratch1, size1);
+  size_t size2 = 5;
+  uint8_t scratch2[size2];
+  Slice result2(scratch2, size2);
+  vector<Slice> results = { result1, result2 };
+
+  // Force a short read
+  FLAGS_env_inject_short_read_bytes = 3;
+
+  // Verify that Read fully reads the whole requested data.
+  ASSERT_OK(file->ReadV(0, results));
+  ASSERT_EQ(result1, "abcde");
+  ASSERT_EQ(result2, "12345");
+
+  // Turn short reads off again
+  FLAGS_env_inject_short_read_bytes = 0;
+
+  // Verify that Read fails with an EndOfFile error at EOF.
+  Status status = file->ReadV(5, results);
+  ASSERT_FALSE(status.ok());
+  ASSERT_TRUE(status.IsEndOfFile());
+  ASSERT_STR_CONTAINS(status.ToString(), "EOF");
+}
+
+TEST_F(TestEnv, TestIOVMax) {
+  Env* env = Env::Default();
+  const string kTestPath = GetTestPath("test");
+
+  const size_t slice_count = IOV_MAX + 42;
+  const size_t slice_size = 5;
+  const size_t data_size = slice_count * slice_size;
+
+  NO_FATALS(WriteTestFile(env, kTestPath, data_size));
+
+  // Reopen for read
+  shared_ptr<RandomAccessFile> file;
+  ASSERT_OK(env_util::OpenFileForRandom(env, kTestPath, &file));
+
+  // Setup more results slices than IOV_MAX
+  uint8_t scratch[data_size];
+  vector<Slice> results;
+  for (size_t i = 0; i < slice_count; i++) {
+    size_t shift = slice_size * i;
+    results.emplace_back(scratch + shift, slice_size);
+  }
+
+  // Force a short read too
+  FLAGS_env_inject_short_read_bytes = 3;
+
+  // Verify all the data is read
+  ASSERT_OK(file->ReadV(0, results));
+  VerifyTestData(Slice(scratch, data_size), 0);
+}
+
+TEST_F(TestEnv, TestAppendV) {
+  WritableFileOptions opts;
+  LOG(INFO) << "Testing AppendV() only, NO pre-allocation";
+  ASSERT_NO_FATAL_FAILURE(TestAppendV(2000, 1024, 5, true, false, opts));
+
+  if (!fallocate_supported_) {
+    LOG(INFO) << "fallocate not supported, skipping preallocated runs";
+  } else {
+    LOG(INFO) << "Testing AppendV() only, WITH pre-allocation";
+    ASSERT_NO_FATAL_FAILURE(TestAppendV(2000, 1024, 5, true, true, opts));
+    LOG(INFO) << "Testing AppendV() together with Append() and Read(), WITH 
pre-allocation";
+    ASSERT_NO_FATAL_FAILURE(TestAppendV(128, 4096, 5, false, true, opts));
+  }
+}
+
+TEST_F(TestEnv, TestGetExecutablePath) {
+  string p;
+  ASSERT_OK(Env::Default()->GetExecutablePath(&p));
+  ASSERT_TRUE(HasSuffixString(p, "env-test")) << p;
+}
+
+TEST_F(TestEnv, TestOpenEmptyRandomAccessFile) {
+  Env* env = Env::Default();
+  string test_file = GetTestPath("test_file");
+  ASSERT_NO_FATAL_FAILURE(WriteTestFile(env, test_file, 0));
+  unique_ptr<RandomAccessFile> readable_file;
+  ASSERT_OK(env->NewRandomAccessFile(test_file, &readable_file));
+  uint64_t size;
+  ASSERT_OK(readable_file->Size(&size));
+  ASSERT_EQ(0, size);
+}
+
+TEST_F(TestEnv, TestOverwrite) {
+  string test_path = GetTestPath("test_env_wf");
+
+  // File does not exist, create it.
+  shared_ptr<WritableFile> writer;
+  ASSERT_OK(env_util::OpenFileForWrite(env_, test_path, &writer));
+
+  // File exists, overwrite it.
+  ASSERT_OK(env_util::OpenFileForWrite(env_, test_path, &writer));
+
+  // File exists, try to overwrite (and fail).
+  WritableFileOptions opts;
+  opts.mode = Env::CREATE_NON_EXISTING;
+  Status s = env_util::OpenFileForWrite(opts,
+                                        env_, test_path, &writer);
+  ASSERT_TRUE(s.IsAlreadyPresent());
+}
+
+TEST_F(TestEnv, TestReopen) {
+  LOG(INFO) << "Testing reopening behavior";
+  string test_path = GetTestPath("test_env_wf");
+  string first = "The quick brown fox";
+  string second = "jumps over the lazy dog";
+
+  // Create the file and write to it.
+  shared_ptr<WritableFile> writer;
+  ASSERT_OK(env_util::OpenFileForWrite(WritableFileOptions(),
+                                       env_, test_path, &writer));
+  ASSERT_OK(writer->Append(first));
+  ASSERT_EQ(first.length(), writer->Size());
+  ASSERT_OK(writer->Close());
+
+  // Reopen it and append to it.
+  WritableFileOptions reopen_opts;
+  reopen_opts.mode = Env::OPEN_EXISTING;
+  ASSERT_OK(env_util::OpenFileForWrite(reopen_opts,
+                                       env_, test_path, &writer));
+  ASSERT_EQ(first.length(), writer->Size());
+  ASSERT_OK(writer->Append(second));
+  ASSERT_EQ(first.length() + second.length(), writer->Size());
+  ASSERT_OK(writer->Close());
+
+  // Check that the file has both strings.
+  shared_ptr<RandomAccessFile> reader;
+  ASSERT_OK(env_util::OpenFileForRandom(env_, test_path, &reader));
+  uint64_t size;
+  ASSERT_OK(reader->Size(&size));
+  ASSERT_EQ(first.length() + second.length(), size);
+  uint8_t scratch[size];
+  Slice s(scratch, size);
+  ASSERT_OK(reader->Read(0, s));
+  ASSERT_EQ(first + second, s.ToString());
+}
+
+TEST_F(TestEnv, TestIsDirectory) {
+  string dir = GetTestPath("a_directory");
+  ASSERT_OK(env_->CreateDir(dir));
+  bool is_dir;
+  ASSERT_OK(env_->IsDirectory(dir, &is_dir));
+  ASSERT_TRUE(is_dir);
+
+  string not_dir = GetTestPath("not_a_directory");
+  unique_ptr<WritableFile> writer;
+  ASSERT_OK(env_->NewWritableFile(not_dir, &writer));
+  ASSERT_OK(env_->IsDirectory(not_dir, &is_dir));
+  ASSERT_FALSE(is_dir);
+}
+
+class ResourceLimitTypeTest : public TestEnv,
+                              public 
::testing::WithParamInterface<Env::ResourceLimitType> {};
+
+INSTANTIATE_TEST_CASE_P(ResourceLimitTypes,
+                        ResourceLimitTypeTest,
+                        
::testing::Values(Env::ResourceLimitType::OPEN_FILES_PER_PROCESS,
+                                          
Env::ResourceLimitType::RUNNING_THREADS_PER_EUID));
+
+// Regression test for KUDU-1798.
+TEST_P(ResourceLimitTypeTest, TestIncreaseLimit) {
+  // Increase the resource limit. It should either increase or remain the same.
+  Env::ResourceLimitType t = GetParam();
+  int64_t limit_before = env_->GetResourceLimit(t);
+  env_->IncreaseResourceLimit(t);
+  int64_t limit_after = env_->GetResourceLimit(t);
+  ASSERT_GE(limit_after, limit_before);
+
+  // Try again. It should definitely be the same now.
+  env_->IncreaseResourceLimit(t);
+  int64_t limit_after_again = env_->GetResourceLimit(t);
+  ASSERT_EQ(limit_after, limit_after_again);
+}
+
+static Status TestWalkCb(unordered_set<string>* actual,
+                         Env::FileType type,
+                         const string& dirname, const string& basename) {
+  VLOG(1) << type << ":" << dirname << ":" << basename;
+  InsertOrDie(actual, (JoinPathSegments(dirname, basename)));
+  return Status::OK();
+}
+
+static Status NoopTestWalkCb(Env::FileType /*type*/,
+                             const string& /*dirname*/,
+                             const string& /*basename*/) {
+  return Status::OK();
+}
+
+TEST_F(TestEnv, TestWalk) {
+  // We test with this tree:
+  //
+  // /root/
+  // /root/file_1
+  // /root/file_2
+  // /root/dir_a/file_1
+  // /root/dir_a/file_2
+  // /root/dir_b/file_1
+  // /root/dir_b/file_2
+  // /root/dir_b/dir_c/file_1
+  // /root/dir_b/dir_c/file_2
+  unordered_set<string> expected;
+  auto create_dir = [&](const string& name) {
+    ASSERT_OK(env_->CreateDir(name));
+    InsertOrDie(&expected, name);
+  };
+  auto create_file = [&](const string& name) {
+    unique_ptr<WritableFile> writer;
+    ASSERT_OK(env_->NewWritableFile(name, &writer));
+    InsertOrDie(&expected, writer->filename());
+  };
+  string root = GetTestPath("root");
+  string subdir_a = JoinPathSegments(root, "dir_a");
+  string subdir_b = JoinPathSegments(root, "dir_b");
+  string subdir_c = JoinPathSegments(subdir_b, "dir_c");
+  string file_one = "file_1";
+  string file_two = "file_2";
+  NO_FATALS(create_dir(root));
+  NO_FATALS(create_file(JoinPathSegments(root, file_one)));
+  NO_FATALS(create_file(JoinPathSegments(root, file_two)));
+  NO_FATALS(create_dir(subdir_a));
+  NO_FATALS(create_file(JoinPathSegments(subdir_a, file_one)));
+  NO_FATALS(create_file(JoinPathSegments(subdir_a, file_two)));
+  NO_FATALS(create_dir(subdir_b));
+  NO_FATALS(create_file(JoinPathSegments(subdir_b, file_one)));
+  NO_FATALS(create_file(JoinPathSegments(subdir_b, file_two)));
+  NO_FATALS(create_dir(subdir_c));
+  NO_FATALS(create_file(JoinPathSegments(subdir_c, file_one)));
+  NO_FATALS(create_file(JoinPathSegments(subdir_c, file_two)));
+
+  // Do the walk.
+  unordered_set<string> actual;
+  ASSERT_OK(env_->Walk(root, Env::PRE_ORDER, Bind(&TestWalkCb, &actual)));
+  ASSERT_EQ(expected, actual);
+}
+
+TEST_F(TestEnv, TestWalkNonExistentPath) {
+  // A walk on a non-existent path should fail.
+  Status s = env_->Walk("/not/a/real/path", Env::PRE_ORDER, 
Bind(&NoopTestWalkCb));
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "One or more errors occurred");
+}
+
+TEST_F(TestEnv, TestWalkBadPermissions) {
+  // Create a directory with mode of 0000.
+  const string kTestPath = GetTestPath("asdf");
+  ASSERT_OK(env_->CreateDir(kTestPath));
+  struct stat stat_buf;
+  PCHECK(stat(kTestPath.c_str(), &stat_buf) == 0);
+  PCHECK(chmod(kTestPath.c_str(), 0000) == 0);
+  SCOPED_CLEANUP({
+    // Restore the old permissions so the path can be successfully deleted.
+    PCHECK(chmod(kTestPath.c_str(), stat_buf.st_mode) == 0);
+  });
+
+  // A walk on a directory without execute permission should fail.
+  Status s = env_->Walk(kTestPath, Env::PRE_ORDER, Bind(&NoopTestWalkCb));
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "One or more errors occurred");
+}
+
+static Status TestWalkErrorCb(int* num_calls,
+                              Env::FileType /*type*/,
+                              const string& /*dirname*/,
+                              const string& /*basename*/) {
+  (*num_calls)++;
+  return Status::Aborted("Returning abort status");
+}
+
+TEST_F(TestEnv, TestWalkCbReturnsError) {
+  string new_dir = GetTestPath("foo");
+  string new_file = "myfile";
+  ASSERT_OK(env_->CreateDir(new_dir));
+  unique_ptr<WritableFile> writer;
+  ASSERT_OK(env_->NewWritableFile(JoinPathSegments(new_dir, new_file), 
&writer));
+  int num_calls = 0;
+  ASSERT_TRUE(env_->Walk(new_dir, Env::PRE_ORDER,
+                         Bind(&TestWalkErrorCb, &num_calls)).IsIOError());
+
+  // Once for the directory and once for the file inside it.
+  ASSERT_EQ(2, num_calls);
+}
+
+TEST_F(TestEnv, TestGlob) {
+  string dir = GetTestPath("glob");
+  ASSERT_OK(env_->CreateDir(dir));
+
+  vector<string> filenames = { "fuzz", "fuzzy", "fuzzyiest", "buzz" };
+  vector<pair<string, size_t>> matchers = {
+    { "file", 0 },
+    { "fuzz", 1 },
+    { "fuzz*", 3 },
+    { "?uzz", 2 },
+  };
+
+  for (const auto& name : filenames) {
+    unique_ptr<WritableFile> file;
+    ASSERT_OK(env_->NewWritableFile(JoinPathSegments(dir, name), &file));
+  }
+
+  for (const auto& matcher : matchers) {
+    SCOPED_TRACE(Substitute("pattern: $0, expected matches: $1",
+                                     matcher.first, matcher.second));
+    vector<string> matches;
+    ASSERT_OK(env_->Glob(JoinPathSegments(dir, matcher.first), &matches));
+    ASSERT_EQ(matcher.second, matches.size());
+  }
+}
+
+// Test that the status returned when 'glob' fails with a permission
+// error is reasonable.
+TEST_F(TestEnv, TestGlobPermissionDenied) {
+  string dir = GetTestPath("glob");
+  ASSERT_OK(env_->CreateDir(dir));
+  chmod(dir.c_str(), 0000);
+  SCOPED_CLEANUP({
+      chmod(dir.c_str(), 0700);
+    });
+  vector<string> matches;
+  Status s = env_->Glob(JoinPathSegments(dir, "*"), &matches);
+  ASSERT_STR_MATCHES(s.ToString(), "IO error: glob failed for /.*: Permission 
denied");
+}
+
+TEST_F(TestEnv, TestGetBlockSize) {
+  uint64_t block_size;
+
+  // Does not exist.
+  ASSERT_TRUE(env_->GetBlockSize("does_not_exist", &block_size).IsNotFound());
+
+  // Try with a directory.
+  ASSERT_OK(env_->GetBlockSize(".", &block_size));
+  ASSERT_GT(block_size, 0);
+
+  // Try with a file.
+  string path = GetTestPath("foo");
+  unique_ptr<WritableFile> writer;
+  ASSERT_OK(env_->NewWritableFile(path, &writer));
+  ASSERT_OK(env_->GetBlockSize(path, &block_size));
+  ASSERT_GT(block_size, 0);
+}
+
+TEST_F(TestEnv, TestGetFileModifiedTime) {
+  string path = GetTestPath("mtime");
+  unique_ptr<WritableFile> writer;
+  ASSERT_OK(env_->NewWritableFile(path, &writer));
+
+  int64_t initial_time;
+  ASSERT_OK(env_->GetFileModifiedTime(writer->filename(), &initial_time));
+
+  // HFS has 1 second mtime granularity.
+  AssertEventually([&] {
+    int64_t after_time;
+    writer->Append(" ");
+    writer->Sync();
+    ASSERT_OK(env_->GetFileModifiedTime(writer->filename(), &after_time));
+    ASSERT_LT(initial_time, after_time);
+  }, MonoDelta::FromSeconds(5));
+  NO_PENDING_FATALS();
+}
+
+TEST_F(TestEnv, TestRWFile) {
+  // Create the file.
+  unique_ptr<RWFile> file;
+  ASSERT_OK(env_->NewRWFile(GetTestPath("foo"), &file));
+
+  // Append to it.
+  string kTestData = "abcde";
+  ASSERT_OK(file->Write(0, kTestData));
+
+  // Read from it.
+  uint8_t scratch[kTestData.length()];
+  Slice result(scratch, kTestData.length());
+  ASSERT_OK(file->Read(0, result));
+  ASSERT_EQ(result, kTestData);
+  uint64_t sz;
+  ASSERT_OK(file->Size(&sz));
+  ASSERT_EQ(kTestData.length(), sz);
+
+  // Read into multiple buffers
+  size_t size1 = 3;
+  uint8_t scratch1[size1];
+  Slice result1(scratch1, size1);
+  size_t size2 = 2;
+  uint8_t scratch2[size2];
+  Slice result2(scratch2, size2);
+  vector<Slice> results = { result1, result2 };
+  ASSERT_OK(file->ReadV(0, results));
+  ASSERT_EQ(result1, "abc");
+  ASSERT_EQ(result2, "de");
+
+  // Write past the end of the file and rewrite some of the interior.
+  ASSERT_OK(file->Write(kTestData.length() * 2, kTestData));
+  ASSERT_OK(file->Write(kTestData.length(), kTestData));
+  ASSERT_OK(file->Write(1, kTestData));
+  string kNewTestData = "aabcdebcdeabcde";
+  uint8_t scratch3[kNewTestData.length()];
+  Slice result3(scratch3, kNewTestData.length());
+  ASSERT_OK(file->Read(0, result3));
+
+  // Retest.
+  ASSERT_EQ(result3, kNewTestData);
+  ASSERT_OK(file->Size(&sz));
+  ASSERT_EQ(kNewTestData.length(), sz);
+
+  // Make sure we can't overwrite it.
+  RWFileOptions opts;
+  opts.mode = Env::CREATE_NON_EXISTING;
+  ASSERT_TRUE(env_->NewRWFile(opts, GetTestPath("foo"), 
&file).IsAlreadyPresent());
+
+  // Reopen it without truncating the existing data.
+  opts.mode = Env::OPEN_EXISTING;
+  ASSERT_OK(env_->NewRWFile(opts, GetTestPath("foo"), &file));
+  uint8_t scratch4[kNewTestData.length()];
+  Slice result4(scratch4, kNewTestData.length());
+  ASSERT_OK(file->Read(0, result4));
+  ASSERT_EQ(result4, kNewTestData);
+}
+
+TEST_F(TestEnv, TestCanonicalize) {
+  vector<string> synonyms = { GetTestPath("."), GetTestPath("./."), 
GetTestPath(".//./") };
+  for (const string& synonym : synonyms) {
+    string result;
+    ASSERT_OK(env_->Canonicalize(synonym, &result));
+    ASSERT_EQ(test_dir_, result);
+  }
+
+  string dir = GetTestPath("some_dir");
+  ASSERT_OK(env_->CreateDir(dir));
+  string result;
+  ASSERT_OK(env_->Canonicalize(dir + "/", &result));
+  ASSERT_EQ(dir, result);
+
+  ASSERT_TRUE(env_->Canonicalize(dir + "/bar", nullptr).IsNotFound());
+}
+
+TEST_F(TestEnv, TestGetTotalRAMBytes) {
+  int64_t ram = 0;
+  ASSERT_OK(env_->GetTotalRAMBytes(&ram));
+
+  // Can't test much about it.
+  ASSERT_GT(ram, 0);
+}
+
+// Test that CopyFile() copies all the bytes properly.
+TEST_F(TestEnv, TestCopyFile) {
+  string orig_path = GetTestPath("test");
+  string copy_path = orig_path + ".copy";
+  const int kFileSize = 1024 * 1024 + 11; // Some odd number of bytes.
+
+  Env* env = Env::Default();
+  NO_FATALS(WriteTestFile(env, orig_path, kFileSize));
+  ASSERT_OK(env_util::CopyFile(env, orig_path, copy_path, 
WritableFileOptions()));
+  unique_ptr<RandomAccessFile> copy;
+  ASSERT_OK(env->NewRandomAccessFile(copy_path, &copy));
+  NO_FATALS(ReadAndVerifyTestData(copy.get(), 0, kFileSize));
+}
+
+// Simple regression test for NewTempRWFile().
+TEST_F(TestEnv, TestTempRWFile) {
+  string tmpl = "foo.XXXXXX";
+  string path;
+  unique_ptr<RWFile> file;
+
+  ASSERT_OK(env_->NewTempRWFile(RWFileOptions(), tmpl, &path, &file));
+  ASSERT_NE(path, tmpl);
+  ASSERT_EQ(0, path.find("foo."));
+  ASSERT_OK(file->Close());
+  ASSERT_OK(env_->DeleteFile(path));
+}
+
+// Test that when we write data to disk we see SpaceInfo.free_bytes go down.
+TEST_F(TestEnv, TestGetSpaceInfoFreeBytes) {
+  const string kDataDir = GetTestPath("parent");
+  const string kTestFilePath = JoinPathSegments(kDataDir, "testfile");
+  const int kFileSizeBytes = 256;
+  ASSERT_OK(env_->CreateDir(kDataDir));
+
+  // Loop in case there are concurrent tests running that are modifying the
+  // filesystem.
+  ASSERT_EVENTUALLY([&] {
+    if (env_->FileExists(kTestFilePath)) {
+      ASSERT_OK(env_->DeleteFile(kTestFilePath)); // Clean up the previous 
iteration.
+    }
+    SpaceInfo before_space_info;
+    ASSERT_OK(env_->GetSpaceInfo(kDataDir, &before_space_info));
+
+    NO_FATALS(WriteTestFile(env_, kTestFilePath, kFileSizeBytes));
+
+    SpaceInfo after_space_info;
+    ASSERT_OK(env_->GetSpaceInfo(kDataDir, &after_space_info));
+    ASSERT_GE(before_space_info.free_bytes - after_space_info.free_bytes, 
kFileSizeBytes);
+  });
+}
+
+// Basic sanity check for GetSpaceInfo().
+TEST_F(TestEnv, TestGetSpaceInfoBasicInvariants) {
+  string path = GetTestDataDirectory();
+  SpaceInfo space_info;
+  ASSERT_OK(env_->GetSpaceInfo(path, &space_info));
+  ASSERT_GT(space_info.capacity_bytes, 0);
+  ASSERT_LE(space_info.free_bytes, space_info.capacity_bytes);
+  VLOG(1) << "Path " << path << " has capacity "
+          << HumanReadableNumBytes::ToString(space_info.capacity_bytes)
+          << " (" << HumanReadableNumBytes::ToString(space_info.free_bytes) << 
" free)";
+}
+
+TEST_F(TestEnv, TestChangeDir) {
+  string orig_dir;
+  ASSERT_OK(env_->GetCurrentWorkingDir(&orig_dir));
+
+  string cwd;
+  ASSERT_OK(env_->ChangeDir("/"));
+  ASSERT_OK(env_->GetCurrentWorkingDir(&cwd));
+  ASSERT_EQ("/", cwd);
+
+  ASSERT_OK(env_->ChangeDir(test_dir_));
+  ASSERT_OK(env_->GetCurrentWorkingDir(&cwd));
+  ASSERT_EQ(test_dir_, cwd);
+
+  ASSERT_OK(env_->ChangeDir(orig_dir));
+  ASSERT_OK(env_->GetCurrentWorkingDir(&cwd));
+  ASSERT_EQ(orig_dir, cwd);
+}
+
+TEST_F(TestEnv, TestGetExtentMap) {
+  // In order to force filesystems that use delayed allocation to write out the
+  // extents, we must Sync() after the file is done growing, and that should
+  // trigger a real fsync() to the filesystem.
+  FLAGS_never_fsync = false;
+
+  const string kTestFilePath = GetTestPath("foo");
+  const int kFileSizeBytes = 1024*1024;
+
+  // Create a test file of a particular size.
+  unique_ptr<RWFile> f;
+  ASSERT_OK(env_->NewRWFile(kTestFilePath, &f));
+  ASSERT_OK(f->PreAllocate(0, kFileSizeBytes, RWFile::CHANGE_FILE_SIZE));
+  ASSERT_OK(f->Sync());
+
+  // The number and distribution of extents differs depending on the
+  // filesystem; this just provides coverage of the code path.
+  RWFile::ExtentMap extents;
+  Status s = f->GetExtentMap(&extents);
+  if (s.IsNotSupported()) {
+    LOG(INFO) << "GetExtentMap() not supported, skipping test";
+    return;
+  }
+  ASSERT_OK(s);
+  SCOPED_TRACE(extents);
+  int num_extents = extents.size();
+  ASSERT_GT(num_extents, 0) <<
+      "There should have been at least one extent in the file";
+
+  uint64_t fs_block_size;
+  ASSERT_OK(env_->GetBlockSize(kTestFilePath, &fs_block_size));
+
+  // Look for an extent to punch. We want an extent that's at least three times
+  // the block size so that we can punch out the "middle" fs block and thus
+  // split the extent in half.
+  uint64_t found_offset = 0;
+  for (const auto& e : extents) {
+    if (e.second >= (fs_block_size * 3)) {
+      found_offset = e.first + fs_block_size;
+      break;
+    }
+  }
+  ASSERT_GT(found_offset, 0) << "Couldn't find extent to split";
+
+  // Punch out a hole and split the extent.
+  s = f->PunchHole(found_offset, fs_block_size);
+  if (s.IsNotSupported()) {
+    LOG(INFO) << "PunchHole() not supported, skipping this part of the test";
+    return;
+  }
+  ASSERT_OK(s);
+  ASSERT_OK(f->Sync());
+
+  // Test the extent map; there should be one more extent.
+  ASSERT_OK(f->GetExtentMap(&extents));
+  ASSERT_EQ(num_extents + 1, extents.size()) <<
+      "Punching a hole should have increased the number of extents by one";
+}
+
+TEST_F(TestEnv, TestInjectEIO) {
+  // Use two files to fail with.
+  FLAGS_crash_on_eio = false;
+  const string kTestRWPath1 = GetTestPath("test_env_rw_file1");
+  unique_ptr<RWFile> rw1;
+  ASSERT_OK(env_->NewRWFile(kTestRWPath1, &rw1));
+
+  const string kTestRWPath2 = GetTestPath("test_env_rw_file2");
+  unique_ptr<RWFile> rw2;
+  ASSERT_OK(env_->NewRWFile(kTestRWPath2, &rw2));
+
+  // Inject EIOs to all operations that might result in an EIO, without
+  // specifying a glob pattern (not specifying the glob pattern will inject
+  // EIOs wherever possible by default).
+  FLAGS_env_inject_eio = 1.0;
+  uint64_t size;
+  Status s = rw1->Size(&size);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "INJECTED FAILURE");
+  s = rw2->Size(&size);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "INJECTED FAILURE");
+
+  // Specify and verify that both files should fail by matching glob patterns
+  // to of each's literal paths.
+  FLAGS_env_inject_eio_globs = Substitute("$0,$1", kTestRWPath1, kTestRWPath2);
+  Slice result;
+  s = rw1->Read(0, result);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "INJECTED FAILURE");
+  s = rw2->Size(&size);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "INJECTED FAILURE");
+
+  // Inject EIOs to all operations that might result in an EIO across paths,
+  // specified with a glob pattern.
+  FLAGS_env_inject_eio_globs = "*";
+  Slice data("data");
+  s = rw1->Write(0, data);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "INJECTED FAILURE");
+  s = rw2->Size(&size);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "INJECTED FAILURE");
+
+  // Specify and verify that one of the files should fail by matching a glob
+  // pattern of one of the literal paths.
+  FLAGS_env_inject_eio_globs = kTestRWPath1;
+  s = rw1->Size(&size);
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "INJECTED FAILURE");
+  ASSERT_OK(rw2->Write(0, data));
+
+  // Specify the directory of one of the files and ensure that fails.
+  FLAGS_env_inject_eio_globs = JoinPathSegments(DirName(kTestRWPath2), "**");
+  s = rw2->Sync();
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "INJECTED FAILURE");
+
+  // Specify a directory and check that failed directory operations are caught.
+  FLAGS_env_inject_eio_globs = DirName(kTestRWPath2);
+  s = env_->SyncDir(DirName(kTestRWPath2));
+  ASSERT_TRUE(s.IsIOError());
+  ASSERT_STR_CONTAINS(s.ToString(), "INJECTED FAILURE");
+
+  // Specify that neither file fails.
+  FLAGS_env_inject_eio_globs = "neither_path";
+  ASSERT_OK(rw1->Close());
+  ASSERT_OK(rw2->Close());
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/env.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/env.cc b/be/src/kudu/util/env.cc
new file mode 100644
index 0000000..90755e0
--- /dev/null
+++ b/be/src/kudu/util/env.cc
@@ -0,0 +1,93 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "kudu/util/env.h"
+
+#include <memory>
+
+#include <glog/logging.h>
+
+#include "kudu/util/faststring.h"
+#include "kudu/util/slice.h"
+
+using std::unique_ptr;
+
+namespace kudu {
+
+Env::~Env() {
+}
+
+SequentialFile::~SequentialFile() {
+}
+
+RandomAccessFile::~RandomAccessFile() {
+}
+
+WritableFile::~WritableFile() {
+}
+
+RWFile::~RWFile() {
+}
+
+FileLock::~FileLock() {
+}
+
+static Status DoWriteStringToFile(Env* env, const Slice& data,
+                                  const std::string& fname,
+                                  bool should_sync) {
+  unique_ptr<WritableFile> file;
+  Status s = env->NewWritableFile(fname, &file);
+  if (!s.ok()) {
+    return s;
+  }
+  s = file->Append(data);
+  if (s.ok() && should_sync) {
+    s = file->Sync();
+  }
+  if (s.ok()) {
+    s = file->Close();
+  }
+  file.reset();  // Will auto-close if we did not close above
+  if (!s.ok()) {
+    WARN_NOT_OK(env->DeleteFile(fname),
+                "Failed to delete partially-written file " + fname);
+  }
+  return s;
+}
+
+// TODO: move these utils into env_util
+Status WriteStringToFile(Env* env, const Slice& data,
+                         const std::string& fname) {
+  return DoWriteStringToFile(env, data, fname, false);
+}
+
+Status WriteStringToFileSync(Env* env, const Slice& data,
+                             const std::string& fname) {
+  return DoWriteStringToFile(env, data, fname, true);
+}
+
+Status ReadFileToString(Env* env, const std::string& fname, faststring* data) {
+  data->clear();
+  unique_ptr<SequentialFile> file;
+  Status s = env->NewSequentialFile(fname, &file);
+  if (!s.ok()) {
+    return s;
+  }
+  static const int kBufferSize = 8192;
+  unique_ptr<uint8_t[]> scratch(new uint8_t[kBufferSize]);
+  while (true) {
+    Slice fragment(scratch.get(), kBufferSize);
+    s = file->Read(&fragment);
+    if (!s.ok()) {
+      break;
+    }
+    data->append(fragment.data(), fragment.size());
+    if (fragment.empty()) {
+      break;
+    }
+  }
+  return s;
+}
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/env.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/env.h b/be/src/kudu/util/env.h
new file mode 100644
index 0000000..2822994
--- /dev/null
+++ b/be/src/kudu/util/env.h
@@ -0,0 +1,681 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// An Env is an interface used by the kudu implementation to access
+// operating system functionality like the filesystem etc.  Callers
+// may wish to provide a custom Env object when opening a database to
+// get fine gain control; e.g., to rate limit file system operations.
+//
+// All Env implementations are safe for concurrent access from
+// multiple threads without any external synchronization.
+
+#ifndef STORAGE_LEVELDB_INCLUDE_ENV_H_
+#define STORAGE_LEVELDB_INCLUDE_ENV_H_
+
+#include <cstddef>
+#include <cstdint>
+#include <iosfwd>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/callback_forward.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class faststring;
+class FileLock;
+class RandomAccessFile;
+class RWFile;
+class SequentialFile;
+class Slice;
+class WritableFile;
+
+struct RandomAccessFileOptions;
+struct RWFileOptions;
+struct WritableFileOptions;
+
+template <typename T>
+class ArrayView;
+
+// Returned by Env::GetSpaceInfo().
+struct SpaceInfo {
+  int64_t capacity_bytes; // Capacity of a filesystem, in bytes.
+  int64_t free_bytes;     // Bytes available to non-privileged processes.
+};
+
+class Env {
+ public:
+  // Governs if/how the file is created.
+  //
+  // enum value                      | file exists       | file does not exist
+  // --------------------------------+-------------------+--------------------
+  // CREATE_IF_NON_EXISTING_TRUNCATE | opens + truncates | creates
+  // CREATE_NON_EXISTING             | fails             | creates
+  // OPEN_EXISTING                   | opens             | fails
+  enum CreateMode {
+    CREATE_IF_NON_EXISTING_TRUNCATE,
+    CREATE_NON_EXISTING,
+    OPEN_EXISTING
+  };
+
+  Env() { }
+  virtual ~Env();
+
+  // Return a default environment suitable for the current operating
+  // system.  Sophisticated users may wish to provide their own Env
+  // implementation instead of relying on this default environment.
+  //
+  // The result of Default() belongs to kudu and must never be deleted.
+  static Env* Default();
+
+  // Create a brand new sequentially-readable file with the specified name.
+  // On success, stores a pointer to the new file in *result and returns OK.
+  // On failure stores NULL in *result and returns non-OK.  If the file does
+  // not exist, returns a non-OK status.
+  //
+  // The returned file will only be accessed by one thread at a time.
+  virtual Status NewSequentialFile(const std::string& fname,
+                                   std::unique_ptr<SequentialFile>* result) = 
0;
+
+  // Create a brand new random access read-only file with the
+  // specified name.  On success, stores a pointer to the new file in
+  // *result and returns OK.  On failure stores NULL in *result and
+  // returns non-OK.  If the file does not exist, returns a non-OK
+  // status.
+  //
+  // The returned file may be concurrently accessed by multiple threads.
+  virtual Status NewRandomAccessFile(const std::string& fname,
+                                     std::unique_ptr<RandomAccessFile>* 
result) = 0;
+
+  // Like the previous NewRandomAccessFile, but allows options to be specified.
+  virtual Status NewRandomAccessFile(const RandomAccessFileOptions& opts,
+                                     const std::string& fname,
+                                     std::unique_ptr<RandomAccessFile>* 
result) = 0;
+
+  // Create an object that writes to a new file with the specified
+  // name.  Deletes any existing file with the same name and creates a
+  // new file.  On success, stores a pointer to the new file in
+  // *result and returns OK.  On failure stores NULL in *result and
+  // returns non-OK.
+  //
+  // The returned file will only be accessed by one thread at a time.
+  virtual Status NewWritableFile(const std::string& fname,
+                                 std::unique_ptr<WritableFile>* result) = 0;
+
+
+  // Like the previous NewWritableFile, but allows options to be
+  // specified.
+  virtual Status NewWritableFile(const WritableFileOptions& opts,
+                                 const std::string& fname,
+                                 std::unique_ptr<WritableFile>* result) = 0;
+
+  // Creates a new WritableFile provided the name_template parameter.
+  // The last six characters of name_template must be "XXXXXX" and these are
+  // replaced with a string that makes the filename unique.
+  // The resulting created filename, if successful, will be stored in the
+  // created_filename out parameter.
+  // The file is created with permissions 0600, that is, read plus write for
+  // owner only. The implementation will create the file in a secure manner,
+  // and will return an error Status if it is unable to open the file.
+  virtual Status NewTempWritableFile(const WritableFileOptions& opts,
+                                     const std::string& name_template,
+                                     std::string* created_filename,
+                                     std::unique_ptr<WritableFile>* result) = 
0;
+
+  // Creates a new readable and writable file. If a file with the same name
+  // already exists on disk, it is deleted.
+  //
+  // Some of the methods of the new file may be accessed concurrently,
+  // while others are only safe for access by one thread at a time.
+  virtual Status NewRWFile(const std::string& fname,
+                           std::unique_ptr<RWFile>* result) = 0;
+
+  // Like the previous NewRWFile, but allows options to be specified.
+  virtual Status NewRWFile(const RWFileOptions& opts,
+                           const std::string& fname,
+                           std::unique_ptr<RWFile>* result) = 0;
+
+  // Same as abovoe for NewTempWritableFile(), but for an RWFile.
+  virtual Status NewTempRWFile(const RWFileOptions& opts,
+                               const std::string& name_template,
+                               std::string* created_filename,
+                               std::unique_ptr<RWFile>* res) = 0;
+
+  // Returns true iff the named file exists.
+  virtual bool FileExists(const std::string& fname) = 0;
+
+  // Store in *result the names of the children of the specified directory.
+  // The names are relative to "dir".
+  // Original contents of *results are dropped.
+  virtual Status GetChildren(const std::string& dir,
+                             std::vector<std::string>* result) = 0;
+
+  // Delete the named file.
+  virtual Status DeleteFile(const std::string& fname) = 0;
+
+  // Create the specified directory.
+  virtual Status CreateDir(const std::string& dirname) = 0;
+
+  // Delete the specified directory.
+  virtual Status DeleteDir(const std::string& dirname) = 0;
+
+  // Return the current working directory.
+  virtual Status GetCurrentWorkingDir(std::string* cwd) const = 0;
+
+  // Change the current working directory.
+  virtual Status ChangeDir(const std::string& dest) = 0;
+
+  // Synchronize the entry for a specific directory.
+  virtual Status SyncDir(const std::string& dirname) = 0;
+
+  // Recursively delete the specified directory.
+  // This should operate safely, not following any symlinks, etc.
+  virtual Status DeleteRecursively(const std::string &dirname) = 0;
+
+  // Store the logical size of fname in *file_size.
+  virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 
0;
+
+  // Store the physical size of fname in *file_size.
+  //
+  // This differs from GetFileSize() in that it returns the actual amount
+  // of space consumed by the file, not the user-facing file size.
+  virtual Status GetFileSizeOnDisk(const std::string& fname, uint64_t* 
file_size) = 0;
+
+  // Walk 'root' recursively, looking up the amount of space used by each file
+  // as reported by GetFileSizeOnDisk(), storing the grand total in 
'bytes_used'.
+  virtual Status GetFileSizeOnDiskRecursively(const std::string& root, 
uint64_t* bytes_used) = 0;
+
+  // Returns the modified time of the file in microseconds.
+  //
+  // The timestamp is a 'system' timestamp, and is not guaranteed to be
+  // monotonic, or have any other consistency properties. The granularity of 
the
+  // timestamp is not guaranteed, and may be as high as 1 second on some
+  // platforms. The timestamp is not guaranteed to be anchored to any 
particular
+  // epoch.
+  virtual Status GetFileModifiedTime(const std::string& fname, int64_t* 
timestamp) = 0;
+
+  // Store the block size of the filesystem where fname resides in
+  // *block_size. fname must exist but it may be a file or a directory.
+  virtual Status GetBlockSize(const std::string& fname, uint64_t* block_size) 
= 0;
+
+  // Determine the capacity and number of bytes free on the filesystem
+  // specified by 'path'. "Free space" accounting on the underlying filesystem
+  // may be more coarse than single bytes.
+  virtual Status GetSpaceInfo(const std::string& path, SpaceInfo* space_info) 
= 0;
+
+  // Rename file src to target.
+  virtual Status RenameFile(const std::string& src,
+                            const std::string& target) = 0;
+
+  // Lock the specified file.  Used to prevent concurrent access to
+  // the same db by multiple processes.  On failure, stores NULL in
+  // *lock and returns non-OK.
+  //
+  // On success, stores a pointer to the object that represents the
+  // acquired lock in *lock and returns OK.  The caller should call
+  // UnlockFile(*lock) to release the lock.  If the process exits,
+  // the lock will be automatically released.
+  //
+  // If somebody else already holds the lock, finishes immediately
+  // with a failure.  I.e., this call does not wait for existing locks
+  // to go away.
+  //
+  // May create the named file if it does not already exist.
+  virtual Status LockFile(const std::string& fname, FileLock** lock) = 0;
+
+  // Release the lock acquired by a previous successful call to LockFile.
+  // REQUIRES: lock was returned by a successful LockFile() call
+  // REQUIRES: lock has not already been unlocked.
+  virtual Status UnlockFile(FileLock* lock) = 0;
+
+  // *path is set to a temporary directory that can be used for testing. It may
+  // or many not have just been created. The directory may or may not differ
+  // between runs of the same process, but subsequent calls will return the
+  // same directory.
+  virtual Status GetTestDirectory(std::string* path) = 0;
+
+  // Returns the number of micro-seconds since some fixed point in time. Only
+  // useful for computing deltas of time.
+  virtual uint64_t NowMicros() = 0;
+
+  // Sleep/delay the thread for the perscribed number of micro-seconds.
+  virtual void SleepForMicroseconds(int micros) = 0;
+
+  // Get caller's thread id.
+  virtual uint64_t gettid() = 0;
+
+  // Return the full path of the currently running executable.
+  virtual Status GetExecutablePath(std::string* path) = 0;
+
+  // Checks if the file is a directory. Returns an error if it doesn't
+  // exist, otherwise writes true or false into 'is_dir' appropriately.
+  virtual Status IsDirectory(const std::string& path, bool* is_dir) = 0;
+
+  // The kind of file found during a walk. Note that symbolic links are
+  // reported as FILE_TYPE.
+  enum FileType {
+    DIRECTORY_TYPE,
+    FILE_TYPE,
+  };
+
+  // Called for each file/directory in the walk.
+  //
+  // The first argument is the type of file.
+  // The second is the dirname of the file.
+  // The third is the basename of the file.
+  //
+  // Returning an error won't halt the walk, but it will cause it to return
+  // with an error status when it's done.
+  typedef Callback<Status(FileType, const std::string&, const std::string&)> 
WalkCallback;
+
+  // Whether to walk directories in pre-order or post-order.
+  enum DirectoryOrder {
+    PRE_ORDER,
+    POST_ORDER,
+  };
+
+  // Walk the filesystem subtree from 'root' down, invoking 'cb' for each
+  // file or directory found, including 'root'.
+  //
+  // The walk will not cross filesystem boundaries. It won't change the
+  // working directory, nor will it follow symbolic links.
+  virtual Status Walk(const std::string& root,
+                      DirectoryOrder order,
+                      const WalkCallback& cb) = 0;
+
+  // Finds paths on the filesystem matching a pattern.
+  //
+  // The found pathnames are added to the 'paths' vector. If no pathnames are
+  // found matching the pattern, no paths are added to the vector and an OK
+  // status is returned.
+  virtual Status Glob(const std::string& path_pattern, 
std::vector<std::string>* paths) = 0;
+
+  // Canonicalize 'path' by applying the following conversions:
+  // - Converts a relative path into an absolute one using the cwd.
+  // - Converts '.' and '..' references.
+  // - Resolves all symbolic links.
+  //
+  // All directory entries in 'path' must exist on the filesystem.
+  virtual Status Canonicalize(const std::string& path, std::string* result) = 
0;
+
+  // Gets the total amount of RAM installed on this machine.
+  virtual Status GetTotalRAMBytes(int64_t* ram) = 0;
+
+  enum class ResourceLimitType {
+    // The maximum number of file descriptors that this process can have open
+    // at any given time.
+    //
+    // Corresponds to RLIMIT_NOFILE on UNIX platforms.
+    OPEN_FILES_PER_PROCESS,
+
+    // The maximum number of threads (or processes) that this process's
+    // effective user ID may have spawned and running at any given time.
+    //
+    // Corresponds to RLIMIT_NPROC on UNIX platforms.
+    RUNNING_THREADS_PER_EUID,
+  };
+
+  // Gets the process' current limit for the given resource type.
+  //
+  // On UNIX platforms, this is equivalent to the resource's soft limit.
+  virtual uint64_t GetResourceLimit(ResourceLimitType t) = 0;
+
+  // Increases the resource limit by as much as possible.
+  //
+  // On UNIX platforms, this means increasing the resource's soft limit (the
+  // limit actually enforced by the kernel) to be equal to the hard limit.
+  virtual void IncreaseResourceLimit(ResourceLimitType t) = 0;
+
+  // Checks whether the given path resides on an ext2, ext3, or ext4
+  // filesystem.
+  //
+  // On success, 'result' contains the answer. On failure, 'result' is unset.
+  virtual Status IsOnExtFilesystem(const std::string& path, bool* result) = 0;
+
+  // Checks whether the given path resides on an xfs filesystem.
+  //
+  // On success, 'result' contains the answer. On failure, 'result' is unset.
+  virtual Status IsOnXfsFilesystem(const std::string& path, bool* result) = 0;
+
+  // Gets the kernel release string for this machine.
+  virtual std::string GetKernelRelease() = 0;
+
+  // Ensure that the file with the given path has permissions which adhere
+  // to the current configured umask (from flags.h). If the permissions are
+  // wider than the current umask, then a warning is logged and the permissions
+  // are fixed.
+  //
+  // Returns a bad Status if the file does not exist or the permissions cannot
+  // be changed.
+  virtual Status EnsureFileModeAdheresToUmask(const std::string& path) = 0;
+
+  // Checks whether the given path has world-readable permissions.
+  //
+  // On success, 'result' contains the answer. On failure, 'result' is unset.
+  virtual Status IsFileWorldReadable(const std::string& path, bool* result) = 
0;
+
+  // Special string injected into file-growing operations' random failures
+  // (if enabled).
+  //
+  // Only useful for tests.
+  static const char* const kInjectedFailureStatusMsg;
+
+ private:
+  // No copying allowed
+  Env(const Env&);
+  void operator=(const Env&);
+};
+
+// A file abstraction for reading sequentially through a file
+class SequentialFile {
+ public:
+  SequentialFile() { }
+  virtual ~SequentialFile();
+
+  // Read up to "result.size" bytes from the file.
+  // Sets "result.data" to the data that was read.
+  //
+  // If an error was encountered, returns a non-OK status
+  // and the contents of "result" are invalid.
+  //
+  // REQUIRES: External synchronization
+  virtual Status Read(Slice* result) = 0;
+
+  // Skip "n" bytes from the file. This is guaranteed to be no
+  // slower that reading the same data, but may be faster.
+  //
+  // If end of file is reached, skipping will stop at the end of the
+  // file, and Skip will return OK.
+  //
+  // REQUIRES: External synchronization
+  virtual Status Skip(uint64_t n) = 0;
+
+  // Returns the filename provided when the SequentialFile was constructed.
+  virtual const std::string& filename() const = 0;
+};
+
+// A file abstraction for randomly reading the contents of a file.
+class RandomAccessFile {
+ public:
+  RandomAccessFile() { }
+  virtual ~RandomAccessFile();
+
+  // Read "result.size" bytes from the file starting at "offset".
+  // Copies the resulting data into "result.data".
+  //
+  // If an error was encountered, returns a non-OK status.
+  //
+  // This method will internally retry on EINTR and "short reads" in order to
+  // fully read the requested number of bytes. In the event that it is not
+  // possible to read exactly 'length' bytes, an IOError is returned.
+  //
+  // Safe for concurrent use by multiple threads.
+  virtual Status Read(uint64_t offset, Slice result) const = 0;
+
+  // Reads up to the "results" aggregate size, based on each Slice's "size",
+  // from the file starting at 'offset'. The Slices must point to 
already-allocated
+  // buffers for the data to be written to.
+  //
+  // If an error was encountered, returns a non-OK status.
+  //
+  // This method will internally retry on EINTR and "short reads" in order to
+  // fully read the requested number of bytes. In the event that it is not
+  // possible to read exactly 'length' bytes, an IOError is returned.
+  //
+  // Safe for concurrent use by multiple threads.
+  virtual Status ReadV(uint64_t offset, ArrayView<Slice> results) const = 0;
+
+  // Returns the size of the file
+  virtual Status Size(uint64_t *size) const = 0;
+
+  // Returns the filename provided when the RandomAccessFile was constructed.
+  virtual const std::string& filename() const = 0;
+
+  // Returns the approximate memory usage of this RandomAccessFile including
+  // the object itself.
+  virtual size_t memory_footprint() const = 0;
+};
+
+// Creation-time options for WritableFile
+struct WritableFileOptions {
+  // Call Sync() during Close().
+  bool sync_on_close;
+
+  // See CreateMode for details.
+  Env::CreateMode mode;
+
+  WritableFileOptions()
+    : sync_on_close(false),
+      mode(Env::CREATE_IF_NON_EXISTING_TRUNCATE) { }
+};
+
+// Options specified when a file is opened for random access.
+struct RandomAccessFileOptions {
+  RandomAccessFileOptions() {}
+};
+
+// A file abstraction for sequential writing.  The implementation
+// must provide buffering since callers may append small fragments
+// at a time to the file.
+class WritableFile {
+ public:
+  enum FlushMode {
+    FLUSH_SYNC,
+    FLUSH_ASYNC
+  };
+
+  WritableFile() { }
+  virtual ~WritableFile();
+
+  virtual Status Append(const Slice& data) = 0;
+
+  // If possible, uses scatter-gather I/O to efficiently append
+  // multiple buffers to a file. Otherwise, falls back to regular I/O.
+  //
+  // For implementation specific quirks and details, see comments in
+  // implementation source code (e.g., env_posix.cc)
+  virtual Status AppendV(ArrayView<const Slice> data) = 0;
+
+  // Pre-allocates 'size' bytes for the file in the underlying filesystem.
+  // size bytes are added to the current pre-allocated size or to the current
+  // offset, whichever is bigger. In no case is the file truncated by this
+  // operation.
+  //
+  // On some implementations, preallocation is done without initializing the
+  // contents of the data blocks (as opposed to writing zeroes), requiring no
+  // IO to the data blocks.
+  //
+  // In no case is the file truncated by this operation.
+  virtual Status PreAllocate(uint64_t size) = 0;
+
+  virtual Status Close() = 0;
+
+  // Flush all dirty data (not metadata) to disk.
+  //
+  // If the flush mode is synchronous, will wait for flush to finish and
+  // return a meaningful status.
+  virtual Status Flush(FlushMode mode) = 0;
+
+  virtual Status Sync() = 0;
+
+  virtual uint64_t Size() const = 0;
+
+  // Returns the filename provided when the WritableFile was constructed.
+  virtual const std::string& filename() const = 0;
+
+ private:
+  // No copying allowed
+  WritableFile(const WritableFile&);
+  void operator=(const WritableFile&);
+};
+
+// Creation-time options for RWFile
+struct RWFileOptions {
+  // Call Sync() during Close().
+  bool sync_on_close;
+
+  // See CreateMode for details.
+  Env::CreateMode mode;
+
+  RWFileOptions()
+    : sync_on_close(false),
+      mode(Env::CREATE_IF_NON_EXISTING_TRUNCATE) { }
+};
+
+// A file abstraction for both reading and writing. No notion of a built-in
+// file offset is ever used; instead, all operations must provide an
+// explicit offset.
+//
+// All operations are safe for concurrent use by multiple threads (unless
+// noted otherwise) bearing in mind the usual filesystem coherency guarantees
+// (e.g. two threads that write concurrently to the same file offset will
+// probably yield garbage).
+class RWFile {
+ public:
+  enum FlushMode {
+    FLUSH_SYNC,
+    FLUSH_ASYNC
+  };
+
+  RWFile() {
+  }
+
+  virtual ~RWFile();
+
+  // Read "result.size" bytes from the file starting at "offset".
+  // Copies the resulting data into "result.data".
+  // If an error was encountered, returns a non-OK status.
+  //
+  // This method will internally retry on EINTR and "short reads" in order to
+  // fully read the requested number of bytes. In the event that it is not
+  // possible to read exactly 'length' bytes, an IOError is returned.
+  //
+  // Safe for concurrent use by multiple threads.
+  virtual Status Read(uint64_t offset, Slice result) const = 0;
+
+  // Reads up to the "results" aggregate size, based on each Slice's "size",
+  // from the file starting at 'offset'. The Slices must point to 
already-allocated
+  // buffers for the data to be written to.
+  //
+  // If an error was encountered, returns a non-OK status.
+  //
+  // This method will internally retry on EINTR and "short reads" in order to
+  // fully read the requested number of bytes. In the event that it is not
+  // possible to read exactly 'length' bytes, an IOError is returned.
+  //
+  // Safe for concurrent use by multiple threads.
+  virtual Status ReadV(uint64_t offset, ArrayView<Slice> results) const = 0;
+
+  // Writes 'data' to the file position given by 'offset'.
+  virtual Status Write(uint64_t offset, const Slice& data) = 0;
+
+  // Writes the 'data' slices to the file position given by 'offset'.
+  virtual Status WriteV(uint64_t offset, ArrayView<const Slice> data) = 0;
+
+  // Preallocates 'length' bytes for the file in the underlying filesystem
+  // beginning at 'offset'. It is safe to preallocate the same range
+  // repeatedly; this is an idempotent operation.
+  //
+  // On some implementations, preallocation is done without initializing the
+  // contents of the data blocks (as opposed to writing zeroes), requiring no
+  // IO to the data blocks. On such implementations, this is much faster than
+  // using Truncate() to increase the file size.
+  //
+  // In no case is the file truncated by this operation.
+  //
+  // 'mode' controls whether the file's logical size grows to include the
+  // preallocated space, or whether it remains the same.
+  enum PreAllocateMode {
+    CHANGE_FILE_SIZE,
+    DONT_CHANGE_FILE_SIZE
+  };
+  virtual Status PreAllocate(uint64_t offset,
+                             size_t length,
+                             PreAllocateMode mode) = 0;
+
+  // Truncate the file. If 'new_size' is less than the previous file size, the
+  // extra data will be lost. If 'new_size' is greater than the previous file
+  // size, the file length is extended, and the extended portion will contain
+  // null bytes ('\0').
+  virtual Status Truncate(uint64_t length) = 0;
+
+  // Deallocates space given by 'offset' and length' from the file,
+  // effectively "punching a hole" in it. The space will be reclaimed by
+  // the filesystem and reads to that range will return zeroes. Useful
+  // for making whole files sparse.
+  //
+  // Filesystems that don't implement this will return an error.
+  virtual Status PunchHole(uint64_t offset, size_t length) = 0;
+
+  // Flushes the range of dirty data (not metadata) given by 'offset' and
+  // 'length' to disk. If length is 0, all bytes from 'offset' to the end
+  // of the file are flushed.
+  //
+  // If the flush mode is synchronous, will wait for flush to finish and
+  // return a meaningful status.
+  virtual Status Flush(FlushMode mode, uint64_t offset, size_t length) = 0;
+
+  // Synchronously flushes all dirty file data and metadata to disk. Upon
+  // returning successfully, all previously issued file changes have been
+  // made durable.
+  virtual Status Sync() = 0;
+
+  // Closes the file, optionally calling Sync() on it if the file was
+  // created with the sync_on_close option enabled.
+  //
+  // Not thread-safe.
+  virtual Status Close() = 0;
+
+  // Retrieves the file's size.
+  virtual Status Size(uint64_t* size) const = 0;
+
+  // Retrieve a map of the file's live extents.
+  //
+  // Each map entry is an offset and size representing a section of live file
+  // data. Any byte offset not contained in a map entry implicitly belongs to a
+  // "hole" in the (sparse) file.
+  //
+  // If the underlying filesystem does not support extents, map entries
+  // represent runs of adjacent fixed-size filesystem blocks instead. If the
+  // platform doesn't support fetching extents at all, a NotSupported status
+  // will be returned.
+  typedef std::map<uint64_t, uint64_t> ExtentMap;
+  virtual Status GetExtentMap(ExtentMap* out) const = 0;
+
+  // Returns the filename provided when the RWFile was constructed.
+  virtual const std::string& filename() const = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(RWFile);
+};
+
+// Identifies a locked file.
+class FileLock {
+ public:
+  FileLock() { }
+  virtual ~FileLock();
+ private:
+  // No copying allowed
+  FileLock(const FileLock&);
+  void operator=(const FileLock&);
+};
+
+// A utility routine: write "data" to the named file.
+extern Status WriteStringToFile(Env* env, const Slice& data,
+                                const std::string& fname);
+
+// A utility routine: read contents of named file into *data
+extern Status ReadFileToString(Env* env, const std::string& fname,
+                               faststring* data);
+
+// Overloaded operator for printing Env::ResourceLimitType.
+std::ostream& operator<<(std::ostream& o, Env::ResourceLimitType t);
+
+}  // namespace kudu
+
+#endif  // STORAGE_LEVELDB_INCLUDE_ENV_H_

Reply via email to