This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit b1fa1a65a008591475b95f8bc0c90b3283826c5d
Author: Attila Bukor <[email protected]>
AuthorDate: Tue Oct 26 13:59:20 2021 +0200

    [security] KUDU-3331 Encrypt file system
    
    de02a34 introduced encryption support to Env in a self-contained way,
    but it's not used across Kudu.
    
    This commit integrates this encryption support into the project and
    modifies several test suites to also run tests with encryption enabled.
    
    I also renamed "encrypted" to "is_sensitive" in *FileOption as a file
    with this flag will be encrypted only if encryption is enabled for the
    process.
    
    When encryption is enabled, the following files are encrypted:
    
    - WAL segments
    - LBM blocks and metadata
    - FBM blocks
    - tablet and consensus metadata
    
    Logs, rolling logs, instance and block manager instance files,
    configuration files in integration tests are not encrypted.
    
    As FileCache is not used to access instance files, it only supports
    handling sensitive files and can't be used to access unencrypted files.
    
    As the PBC CLI tool can be used to dump encrypted (metadata) and
    unencrypted files (instance) as well, it needs to be able to determine
    if a file is encrypted or not. As encryption headers are not yet
    implemented, I introduced a hack which checks the file name and treats
    the file as unencrypted if it ends with "instance" and encrypted
    otherwise.
    
    I ran some benchmarks to compare running Kudu with encryption enabled
    and disabled.
    
    The following are StartupBenchmark tests run with KUDU_ALLOW_SLOW_TESTS
    set to true, which uses a block count of 1,000,000.
    
    It seems enabling encryption adds around 20% overhead on startup in a
    typical use-case with no deletes. All tests below were ran in release
    mode.
    
     Performance counter stats for './bin/log_block_manager-test 
--gtest_filter=*StartupBenchmark/0' (10 runs):
    
          40391.075316      task-clock (msec)         #    2.021 CPUs utilized  
          ( +-  1.05% )
                11,089      context-switches          #    0.275 K/sec          
          ( +-  9.87% )
                   280      cpu-migrations            #    0.007 K/sec          
          ( +-  1.58% )
               593,982      page-faults               #    0.015 M/sec          
          ( +-  2.13% )
       110,595,311,391      cycles                    #    2.738 GHz            
          ( +-  1.03% )
        90,580,214,722      instructions              #    0.82  insn per cycle 
          ( +-  0.14% )
        16,449,237,957      branches                  #  407.249 M/sec          
          ( +-  0.15% )
            67,169,915      branch-misses             #    0.41% of all 
branches          ( +-  0.49% )
    
          19.988553457 seconds time elapsed                                     
     ( +-  0.58% )
    
     Performance counter stats for './bin/log_block_manager-test 
--encrypt_data_at_rest=1 --gtest_filter=*StartupBenchmark/0' (10 runs):
    
          51317.845606      task-clock (msec)         #    2.133 CPUs utilized  
          ( +-  0.90% )
                13,214      context-switches          #    0.257 K/sec          
          ( +-  4.03% )
                   292      cpu-migrations            #    0.006 K/sec          
          ( +-  1.76% )
               737,815      page-faults               #    0.014 M/sec          
          ( +-  1.49% )
       144,898,246,536      cycles                    #    2.824 GHz            
          ( +-  1.08% )
       126,702,271,070      instructions              #    0.87  insn per cycle 
          ( +-  0.05% )
        24,116,649,584      branches                  #  469.947 M/sec          
          ( +-  0.05% )
           106,793,688      branch-misses             #    0.44% of all 
branches          ( +-  0.35% )
    
          24.055824830 seconds time elapsed                                     
     ( +-  0.89% )
    
    With deletes, the difference seems to decrease to about 14% when 90% of
    the blocks are deleted.
    
     Performance counter stats for './bin/log_block_manager-test 
--gtest_filter=*StartupBenchmark/1' (10 runs):
    
          53247.212289      task-clock (msec)         #    1.494 CPUs utilized  
          ( +-  0.69% )
                94,868      context-switches          #    0.002 M/sec          
          ( +-  0.13% )
                   530      cpu-migrations            #    0.010 K/sec          
          ( +-  1.48% )
               399,284      page-faults               #    0.007 M/sec          
          ( +-  1.66% )
       145,147,457,046      cycles                    #    2.726 GHz            
          ( +-  0.48% )
       141,892,983,444      instructions              #    0.98  insn per cycle 
          ( +-  0.04% )
        26,167,495,753      branches                  #  491.434 M/sec          
          ( +-  0.04% )
            59,986,442      branch-misses             #    0.23% of all 
branches          ( +-  0.33% )
    
          35.648681894 seconds time elapsed                                     
     ( +-  1.40% )
    
     Performance counter stats for './bin/log_block_manager-test 
--encrypt_data_at_rest=1 --gtest_filter=*StartupBenchmark/1' (10 runs):
    
          70616.598642      task-clock (msec)         #    1.737 CPUs utilized  
          ( +-  0.81% )
                95,082      context-switches          #    0.001 M/sec          
          ( +-  0.28% )
                   523      cpu-migrations            #    0.007 K/sec          
          ( +-  1.69% )
               679,834      page-faults               #    0.010 M/sec          
          ( +-  1.66% )
       203,066,615,244      cycles                    #    2.876 GHz            
          ( +-  1.05% )
       209,355,734,267      instructions              #    1.03  insn per cycle 
          ( +-  0.08% )
        40,477,560,095      branches                  #  573.202 M/sec          
          ( +-  0.07% )
           133,637,310      branch-misses             #    0.33% of all 
branches          ( +-  1.48% )
    
          40.653406472 seconds time elapsed                                     
     ( +-  1.52% )
    
    Delete tablet benchmark takes less than a second to run, so I ran it
    1000 times with encryption disabled and enabled. It seems encryption
    costs about 30% of overhead in this case.
    
     Performance counter stats for './bin/tablet_server-test 
--gtest_filter=TabletServerTest.TestDeleteTabletBenchmark' (1000 runs):
    
            735.800649      task-clock (msec)         #    0.994 CPUs utilized  
          ( +-  0.33% )
                 3,613      context-switches          #    0.005 M/sec          
          ( +-  0.15% )
                   178      cpu-migrations            #    0.242 K/sec          
          ( +-  0.29% )
                10,722      page-faults               #    0.015 M/sec          
          ( +-  0.08% )
         1,316,404,469      cycles                    #    1.789 GHz            
          ( +-  0.19% )
         1,629,691,550      instructions              #    1.24  insn per cycle 
          ( +-  0.21% )
           337,778,107      branches                  #  459.062 M/sec          
          ( +-  0.19% )
             6,340,956      branch-misses             #    1.88% of all 
branches          ( +-  0.21% )
    
           0.739940005 seconds time elapsed                                     
     ( +-  2.33% )
    
     Performance counter stats for './bin/tablet_server-test 
--encrypt_data_at_rest=1 
--gtest_filter=TabletServerTest.TestDeleteTabletBenchmark' (1000 runs):
    
            769.368354      task-clock (msec)         #    0.792 CPUs utilized  
          ( +-  0.34% )
                 3,633      context-switches          #    0.005 M/sec          
          ( +-  0.13% )
                   183      cpu-migrations            #    0.238 K/sec          
          ( +-  0.29% )
                10,737      page-faults               #    0.014 M/sec          
          ( +-  0.07% )
         1,356,327,815      cycles                    #    1.763 GHz            
          ( +-  0.14% )
         1,635,206,270      instructions              #    1.21  insn per cycle 
          ( +-  0.06% )
           338,261,840      branches                  #  439.662 M/sec          
          ( +-  0.06% )
             6,486,125      branch-misses             #    1.92% of all 
branches          ( +-  0.21% )
    
           0.971974609 seconds time elapsed                                     
     ( +-  2.42% )
    
    Change-Id: I909d0c4af0c1fca0d14c99a6627842dbe2ed7524
    Reviewed-on: http://gerrit.cloudera.org:8080/17974
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/consensus/consensus_meta-test.cc          |  36 ++++--
 src/kudu/consensus/consensus_meta.cc               |   6 +-
 src/kudu/consensus/log.cc                          |   4 +-
 src/kudu/consensus/log_index.cc                    |   1 +
 src/kudu/consensus/log_util.cc                     |   1 +
 src/kudu/fs/block_manager-test.cc                  |   7 +-
 src/kudu/fs/dir_manager.cc                         |   3 +-
 src/kudu/fs/dir_util.cc                            |  11 +-
 src/kudu/fs/file_block_manager.cc                  |   5 +-
 src/kudu/fs/fs_manager-test.cc                     |   5 +-
 src/kudu/fs/fs_manager.cc                          |   5 +-
 src/kudu/fs/log_block_manager-test-util.cc         |  16 ++-
 src/kudu/fs/log_block_manager-test.cc              | 128 ++++++++++++++-------
 src/kudu/fs/log_block_manager.cc                   |  16 ++-
 src/kudu/integration-tests/dense_node-itest.cc     |  13 ++-
 .../integration-tests/mini_cluster_fs_inspector.cc |   8 +-
 src/kudu/integration-tests/raft_consensus-itest.cc |  27 ++++-
 src/kudu/integration-tests/security-itest.cc       |   1 +
 src/kudu/mini-cluster/external_mini_cluster.cc     |   6 +
 src/kudu/mini-cluster/external_mini_cluster.h      |  10 +-
 src/kudu/postgres/mini_postgres.cc                 |   4 +-
 src/kudu/ranger/ranger_client.cc                   |   4 +-
 src/kudu/security/test/mini_kdc.cc                 |   4 +-
 src/kudu/tablet/tablet_metadata.cc                 |   5 +-
 src/kudu/tools/kudu-tool-test.cc                   |   5 +-
 src/kudu/tools/tool_action_pbc.cc                  |  26 ++++-
 src/kudu/tserver/tablet_copy_client.cc             |  11 +-
 .../tserver/tablet_copy_source_session-test.cc     |   5 +-
 src/kudu/tserver/tablet_server-test.cc             |   4 +-
 src/kudu/util/env-test.cc                          |  12 +-
 src/kudu/util/env.cc                               |  21 +++-
 src/kudu/util/env.h                                |  26 +++--
 src/kudu/util/env_posix.cc                         |  20 +++-
 src/kudu/util/env_util.cc                          |  10 +-
 src/kudu/util/file_cache-test.cc                   |  47 ++++++--
 src/kudu/util/file_cache.cc                        |   5 +-
 src/kudu/util/pb_util-test.cc                      |  87 ++++++++------
 src/kudu/util/pb_util.cc                           |  25 ++--
 src/kudu/util/pb_util.h                            |  11 +-
 src/kudu/util/rolling_log.cc                       |   5 +-
 src/kudu/util/yamlreader-test.cc                   |   4 +-
 41 files changed, 460 insertions(+), 190 deletions(-)

diff --git a/src/kudu/consensus/consensus_meta-test.cc 
b/src/kudu/consensus/consensus_meta-test.cc
index 50616f6..50e26ae 100644
--- a/src/kudu/consensus/consensus_meta-test.cc
+++ b/src/kudu/consensus/consensus_meta-test.cc
@@ -25,6 +25,7 @@
 #include <string>
 #include <vector>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -42,6 +43,8 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_bool(encrypt_data_at_rest);
+
 namespace kudu {
 namespace consensus {
 
@@ -52,7 +55,7 @@ using std::vector;
 const char* kTabletId = "test-consensus-metadata";
 const int64_t kInitialTerm = 3;
 
-class ConsensusMetadataTest : public KuduTest {
+class ConsensusMetadataTest : public KuduTest, public 
::testing::WithParamInterface<bool> {
  public:
   ConsensusMetadataTest()
       : fs_manager_(env_, FsManagerOpts(GetTestPath("fs_root"))) {
@@ -75,6 +78,11 @@ class ConsensusMetadataTest : public KuduTest {
   void AssertValuesEqual(const scoped_refptr<ConsensusMetadata>& cmeta,
                          int64_t opid_index, const string& permanant_uuid, 
int64_t term);
 
+
+  void EnableEncryption(bool enable) {
+    FLAGS_encrypt_data_at_rest = true;
+  }
+
   FsManager fs_manager_;
   RaftConfigPB config_;
 };
@@ -92,8 +100,11 @@ void ConsensusMetadataTest::AssertValuesEqual(const 
scoped_refptr<ConsensusMetad
   ASSERT_EQ(term, cmeta->current_term());
 }
 
+INSTANTIATE_TEST_SUITE_P(, ConsensusMetadataTest, ::testing::Values(false, 
true));
+
 // Test the basic "happy case" of creating and then loading a file.
-TEST_F(ConsensusMetadataTest, TestCreateLoad) {
+TEST_P(ConsensusMetadataTest, TestCreateLoad) {
+  EnableEncryption(GetParam());
   // Create the file.
   {
     ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, 
fs_manager_.uuid(),
@@ -108,7 +119,8 @@ TEST_F(ConsensusMetadataTest, TestCreateLoad) {
 }
 
 // Test deferred creation.
-TEST_F(ConsensusMetadataTest, TestDeferredCreateLoad) {
+TEST_P(ConsensusMetadataTest, TestDeferredCreateLoad) {
+  EnableEncryption(GetParam());
   // Create the cmeta object, but not the file.
   scoped_refptr<ConsensusMetadata> writer;
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, 
fs_manager_.uuid(),
@@ -128,7 +140,8 @@ TEST_F(ConsensusMetadataTest, TestDeferredCreateLoad) {
 }
 
 // Ensure that Create() will not overwrite an existing file.
-TEST_F(ConsensusMetadataTest, TestCreateNoOverwrite) {
+TEST_P(ConsensusMetadataTest, TestCreateNoOverwrite) {
+  EnableEncryption(GetParam());
   // Create the consensus metadata file.
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, 
fs_manager_.uuid(),
                                       config_, kInitialTerm));
@@ -140,14 +153,16 @@ TEST_F(ConsensusMetadataTest, TestCreateNoOverwrite) {
 }
 
 // Ensure that we get an error when loading a file that doesn't exist.
-TEST_F(ConsensusMetadataTest, TestFailedLoad) {
+TEST_P(ConsensusMetadataTest, TestFailedLoad) {
+  EnableEncryption(GetParam());
   Status s = ConsensusMetadata::Load(&fs_manager_, kTabletId, 
fs_manager_.uuid());
   ASSERT_TRUE(s.IsNotFound()) << "Unexpected status: " << s.ToString();
   LOG(INFO) << "Expected failure: " << s.ToString();
 }
 
 // Check that changes are not written to disk until Flush() is called.
-TEST_F(ConsensusMetadataTest, TestFlush) {
+TEST_P(ConsensusMetadataTest, TestFlush) {
+  EnableEncryption(GetParam());
   const int64_t kNewTerm = 4;
   scoped_refptr<ConsensusMetadata> cmeta;
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, 
fs_manager_.uuid(),
@@ -190,7 +205,8 @@ RaftConfigPB BuildConfig(const vector<string>& uuids) {
 }
 
 // Test ConsensusMetadata active role calculation.
-TEST_F(ConsensusMetadataTest, TestActiveRole) {
+TEST_P(ConsensusMetadataTest, TestActiveRole) {
+  EnableEncryption(GetParam());
   vector<string> uuids = { "a", "b", "c", "d" };
   string peer_uuid = "e";
   RaftConfigPB config1 = BuildConfig(uuids); // We aren't a member of this 
config...
@@ -256,7 +272,8 @@ TEST_F(ConsensusMetadataTest, TestActiveRole) {
 
 // Ensure that invocations of ToConsensusStatePB() return the expected state
 // in the returned object.
-TEST_F(ConsensusMetadataTest, TestToConsensusStatePB) {
+TEST_P(ConsensusMetadataTest, TestToConsensusStatePB) {
+  EnableEncryption(GetParam());
   vector<string> uuids = { "a", "b", "c", "d" };
   string peer_uuid = "e";
 
@@ -315,7 +332,8 @@ static void AssertConsensusMergeExpected(const 
scoped_refptr<ConsensusMetadata>&
 }
 
 // Ensure that MergeCommittedConsensusStatePB() works as advertised.
-TEST_F(ConsensusMetadataTest, TestMergeCommittedConsensusStatePB) {
+TEST_P(ConsensusMetadataTest, TestMergeCommittedConsensusStatePB) {
+  EnableEncryption(GetParam());
   vector<string> uuids = { "a", "b", "c", "d" };
 
   RaftConfigPB committed_config = BuildConfig(uuids); // We aren't a member of 
this config...
diff --git a/src/kudu/consensus/consensus_meta.cc 
b/src/kudu/consensus/consensus_meta.cc
index 60d3758..8f520fe 100644
--- a/src/kudu/consensus/consensus_meta.cc
+++ b/src/kudu/consensus/consensus_meta.cc
@@ -319,7 +319,8 @@ Status ConsensusMetadata::Flush(FlushMode flush_mode) {
       // fsync() due to periodic commit with default settings, whereas other
       // filesystems such as XFS will not commit as often and need the fsync to
       // avoid significant data loss when a crash happens.
-      FLAGS_log_force_fsync_all || cmeta_force_fsync ? pb_util::SYNC : 
pb_util::NO_SYNC),
+      FLAGS_log_force_fsync_all || cmeta_force_fsync ? pb_util::SYNC : 
pb_util::NO_SYNC,
+      pb_util::SENSITIVE),
           Substitute("Unable to write consensus meta file for tablet $0 to 
path $1",
                      tablet_id_, meta_file_path));
   return UpdateOnDiskSize();
@@ -376,7 +377,8 @@ Status ConsensusMetadata::Load(FsManager* fs_manager,
   scoped_refptr<ConsensusMetadata> cmeta(new ConsensusMetadata(fs_manager, 
tablet_id, peer_uuid));
   RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(fs_manager->env(),
                                                  
fs_manager->GetConsensusMetadataPath(tablet_id),
-                                                 &cmeta->pb_));
+                                                 &cmeta->pb_,
+                                                 pb_util::SENSITIVE));
   cmeta->UpdateActiveRole(); // Needs to happen here as we sidestep the 
accessor APIs.
 
   RETURN_NOT_OK(cmeta->UpdateOnDiskSize());
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 716e7e5..1e49722 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -639,8 +639,10 @@ Status SegmentAllocator::AllocateNewSegment() {
   VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, 
template: " << path_tmpl;
   unique_ptr<RWFile> segment_file;
   Env* env = ctx_->fs_manager->env();
+  RWFileOptions opts;
+  opts.is_sensitive = true;
   RETURN_NOT_OK_PREPEND(env->NewTempRWFile(
-      RWFileOptions(), path_tmpl, &next_segment_path_, &segment_file),
+      opts, path_tmpl, &next_segment_path_, &segment_file),
                         "could not create next WAL segment");
   next_segment_file_.reset(segment_file.release());
   VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << 
next_segment_path_;
diff --git a/src/kudu/consensus/log_index.cc b/src/kudu/consensus/log_index.cc
index a891dc5..18d160d 100644
--- a/src/kudu/consensus/log_index.cc
+++ b/src/kudu/consensus/log_index.cc
@@ -109,6 +109,7 @@ Status LogIndex::IndexChunk::Open(FileCache* file_cache) {
     unique_ptr<RWFile> f;
     RWFileOptions opts;
     opts.mode = Env::CREATE_OR_OPEN;
+    opts.is_sensitive = true;
     RETURN_NOT_OK(env_->NewRWFile(opts, path_, &f));
     file_.reset(f.release());
   }
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 0dd655c..2455fa8 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -271,6 +271,7 @@ Status ReadableLogSegment::Open(Env* env,
     unique_ptr<RWFile> f;
     RWFileOptions opts;
     opts.mode = Env::MUST_EXIST;
+    opts.is_sensitive = true;
     RETURN_NOT_OK_PREPEND(env->NewRWFile(opts, path, &f),
                           "Unable to open file for reading");
     file.reset(f.release());
diff --git a/src/kudu/fs/block_manager-test.cc 
b/src/kudu/fs/block_manager-test.cc
index a861a2c..7407e3e 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -327,7 +327,8 @@ void 
BlockManagerTest<FileBlockManager>::RunMultipathTest(const vector<string>&
       DirInstanceMetadataPB instance;
       ASSERT_OK(pb_util::ReadPBContainerFromPath(env_,
                                                  JoinPathSegments(path, child),
-                                                 &instance));
+                                                 &instance,
+                                                 pb_util::NOT_SENSITIVE));
     }
   }
   // Create a DataDirGroup for the data that's about to be inserted.
@@ -1050,7 +1051,9 @@ TYPED_TEST(BlockManagerTest, TestGetAllBlockIds) {
                          string("12345"), // not a real block ID
                          ids.begin()->ToString() }) { // not in a block 
directory
     unique_ptr<WritableFile> writer;
-    ASSERT_OK(this->env_->NewWritableFile(
+    WritableFileOptions opts;
+    opts.is_sensitive = true;
+    ASSERT_OK(this->env_->NewWritableFile(opts,
         JoinPathSegments(this->test_dir_, s), &writer));
     ASSERT_OK(writer->Close());
   }
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index e53be7c..61f3e92 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -386,7 +386,8 @@ Status DirManager::UpdateHealthyInstances(
     CHECK_EQ(1, copies_to_delete.erase(copy_filename));
     Status s = pb_util::WritePBContainerToPath(
         env_, instance_filename, new_pb, pb_util::OVERWRITE,
-        sync_dirs() ? pb_util::SYNC : pb_util::NO_SYNC);
+        sync_dirs() ? pb_util::SYNC : pb_util::NO_SYNC,
+        pb_util::NOT_SENSITIVE);
     // We've failed to update for some reason, so restore our original file.
     // Since we're renaming our copy, we don't have to delete it.
     if (PREDICT_FALSE(!s.ok())) {
diff --git a/src/kudu/fs/dir_util.cc b/src/kudu/fs/dir_util.cc
index 9b12c3b..8d1a8a8 100644
--- a/src/kudu/fs/dir_util.cc
+++ b/src/kudu/fs/dir_util.cc
@@ -70,6 +70,7 @@ Status CheckHolePunch(Env* env, const string& path) {
   string filename = JoinPathSegments(path, "hole_punch_test_file");
   unique_ptr<RWFile> file;
   RWFileOptions opts;
+  opts.is_sensitive = false;
   RETURN_NOT_OK(env->NewRWFile(opts, filename, &file));
 
   // The file has been created; delete it on exit no matter what happens.
@@ -174,8 +175,10 @@ Status DirInstanceMetadataFile::Create(const set<string>& 
all_uuids,
   string tmp_template = JoinPathSegments(
       dir_name, Substitute("getblocksize$0.XXXXXX", kTmpInfix));
   unique_ptr<WritableFile> tmp_file;
+  WritableFileOptions opts;
+  opts.is_sensitive = false;
   RETURN_NOT_OK_FAIL_INSTANCE_PREPEND(
-      env_->NewTempWritableFile(WritableFileOptions(),
+      env_->NewTempWritableFile(opts,
                                 tmp_template,
                                 &created_filename, &tmp_file),
       "failed to create temp file while checking block size");
@@ -203,7 +206,8 @@ Status DirInstanceMetadataFile::Create(const set<string>& 
all_uuids,
   RETURN_NOT_OK_FAIL_INSTANCE_PREPEND(pb_util::WritePBContainerToPath(
       env_, filename_, new_instance,
       pb_util::NO_OVERWRITE,
-      FLAGS_enable_data_block_fsync ? pb_util::SYNC : pb_util::NO_SYNC),
+      FLAGS_enable_data_block_fsync ? pb_util::SYNC : pb_util::NO_SYNC,
+      pb_util::NOT_SENSITIVE),
       "failed to write PB");
 
   // Now that we're returning success, we don't need to clean anything up, and
@@ -221,7 +225,8 @@ Status DirInstanceMetadataFile::LoadFromDisk() {
       "Opening a metadata file that's already locked would release the lock";
 
   unique_ptr<DirInstanceMetadataPB> pb(new DirInstanceMetadataPB());
-  RETURN_NOT_OK_FAIL_INSTANCE_PREPEND(pb_util::ReadPBContainerFromPath(env_, 
filename_, pb.get()),
+  RETURN_NOT_OK_FAIL_INSTANCE_PREPEND(
+      pb_util::ReadPBContainerFromPath(env_, filename_, pb.get(), 
pb_util::NOT_SENSITIVE),
       Substitute("Failed to read metadata file from $0", filename_));
 
   if (pb->dir_type() != dir_type_) {
diff --git a/src/kudu/fs/file_block_manager.cc 
b/src/kudu/fs/file_block_manager.cc
index 1e9db05..89cf6c8 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -788,6 +788,7 @@ Status FileBlockManager::CreateBlock(const 
CreateBlockOptions& opts,
         error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, 
dir));
     WritableFileOptions wr_opts;
     wr_opts.mode = Env::MUST_CREATE;
+    wr_opts.is_sensitive = true;
     s = env_util::OpenFileForWrite(wr_opts, env_, path, &writer);
   } while (PREDICT_FALSE(s.IsAlreadyPresent()));
   if (s.ok()) {
@@ -834,7 +835,9 @@ Status FileBlockManager::OpenBlock(const BlockId& block_id,
         path, &reader));
   } else {
     unique_ptr<RandomAccessFile> r;
-    RETURN_NOT_OK_FBM_DISK_FAILURE(env_->NewRandomAccessFile(path, &r));
+    RandomAccessFileOptions opts;
+    opts.is_sensitive = true;
+    RETURN_NOT_OK_FBM_DISK_FAILURE(env_->NewRandomAccessFile(opts, path, &r));
     reader.reset(r.release());
   }
   block->reset(new internal::FileReadableBlock(this, block_id, reader));
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index 44ca395..a6f43a8 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -857,7 +857,7 @@ TEST_P(FsManagerTestBase, 
TestEIOWhileRunningUpdateDirsTool) {
       // Collect the contents of the InstanceMetadataPB objects.
       const auto instance_path = JoinPathSegments(root, 
FsManager::kInstanceMetadataFileName);
       unique_ptr<InstanceMetadataPB> pb(new InstanceMetadataPB);
-      Status s = ReadPBContainerFromPath(env_, instance_path, pb.get());
+      Status s = ReadPBContainerFromPath(env_, instance_path, pb.get(), 
pb_util::NOT_SENSITIVE);
       if (s.IsNotFound()) {
         InsertOrDie(&instances, instance_path, "");
       } else {
@@ -869,7 +869,8 @@ TEST_P(FsManagerTestBase, 
TestEIOWhileRunningUpdateDirsTool) {
       unique_ptr<DirInstanceMetadataPB> bmi_pb(new DirInstanceMetadataPB);
       const auto block_manager_instance = JoinPathSegments(
           JoinPathSegments(root, kDataDirName), kInstanceMetadataFileName);
-      s = ReadPBContainerFromPath(env_, block_manager_instance, bmi_pb.get());
+      s = ReadPBContainerFromPath(env_, block_manager_instance,
+                                  bmi_pb.get(), pb_util::NOT_SENSITIVE);
       if (s.IsNotFound()) {
         InsertOrDie(&instances, block_manager_instance, "");
       } else {
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index d84c751..05b7b0e 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -344,7 +344,7 @@ Status FsManager::PartialOpen(CanonicalizedRootsList* 
missing_roots) {
     }
     unique_ptr<InstanceMetadataPB> pb(new InstanceMetadataPB);
     Status s = pb_util::ReadPBContainerFromPath(env_, 
GetInstanceMetadataPath(root.path),
-                                                pb.get());
+                                                pb.get(), 
pb_util::NOT_SENSITIVE);
     if (PREDICT_FALSE(!s.ok())) {
       if (s.IsNotFound()) {
         if (missing_roots) {
@@ -677,7 +677,8 @@ Status FsManager::WriteInstanceMetadata(const 
InstanceMetadataPB& metadata,
   RETURN_NOT_OK(pb_util::WritePBContainerToPath(env_, path,
                                                 metadata,
                                                 pb_util::NO_OVERWRITE,
-                                                pb_util::SYNC));
+                                                pb_util::SYNC,
+                                                pb_util::NOT_SENSITIVE));
   LOG(INFO) << "Generated new instance metadata in path " << path << ":\n"
             << SecureDebugString(metadata);
   return Status::OK();
diff --git a/src/kudu/fs/log_block_manager-test-util.cc 
b/src/kudu/fs/log_block_manager-test-util.cc
index b308a46..f255698 100644
--- a/src/kudu/fs/log_block_manager-test-util.cc
+++ b/src/kudu/fs/log_block_manager-test-util.cc
@@ -124,6 +124,7 @@ Status LBMCorruptor::PreallocateFullContainer() {
   unique_ptr<RWFile> data_file;
   RWFileOptions opts;
   opts.mode = Env::MUST_EXIST;
+  opts.is_sensitive = true;
   RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
   int64_t initial_size;
   RETURN_NOT_OK(PreallocateForBlock(data_file.get(), mode,
@@ -154,6 +155,7 @@ Status LBMCorruptor::AddUnpunchedBlockToFullContainer() {
   unique_ptr<RWFile> data_file;
   RWFileOptions opts;
   opts.mode = Env::MUST_EXIST;
+  opts.is_sensitive = true;
   RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
   int64_t block_length = (rand_.Uniform(16) + 1) * fs_block_size;
   int64_t initial_data_size;
@@ -190,14 +192,16 @@ Status LBMCorruptor::CreateIncompleteContainer() {
   // 2. No data file but metadata file exists (and is up to a certain size).
   // 3. Empty data file and metadata file exists (and is up to a certain size).
   int r = rand_.Uniform(3);
+  RWFileOptions opts;
+  opts.is_sensitive = true;
   if (r == 0) {
-    RETURN_NOT_OK(env_->NewRWFile(data_fname, &data_file));
+    RETURN_NOT_OK(env_->NewRWFile(opts, data_fname, &data_file));
   } else if (r == 1) {
-    RETURN_NOT_OK(env_->NewRWFile(metadata_fname, &data_file));
+    RETURN_NOT_OK(env_->NewRWFile(opts, metadata_fname, &data_file));
   } else {
     CHECK_EQ(r, 2);
-    RETURN_NOT_OK(env_->NewRWFile(data_fname, &data_file));
-    RETURN_NOT_OK(env_->NewRWFile(metadata_fname, &data_file));
+    RETURN_NOT_OK(env_->NewRWFile(opts, data_fname, &data_file));
+    RETURN_NOT_OK(env_->NewRWFile(opts, metadata_fname, &data_file));
   }
 
   if (data_file) {
@@ -228,6 +232,7 @@ Status LBMCorruptor::AddMalformedRecordToContainer() {
     unique_ptr<RWFile> data_file;
     RWFileOptions opts;
     opts.mode = Env::MUST_EXIST;
+    opts.is_sensitive = true;
     RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
     RETURN_NOT_OK(PreallocateForBlock(data_file.get(), 
RWFile::CHANGE_FILE_SIZE,
                                       kBlockSize, &initial_data_size));
@@ -293,6 +298,7 @@ Status LBMCorruptor::AddMisalignedBlockToContainer() {
   unique_ptr<RWFile> data_file;
   RWFileOptions opts;
   opts.mode = Env::MUST_EXIST;
+  opts.is_sensitive = true;
   RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
   uint64_t initial_data_size;
   RETURN_NOT_OK(data_file->Size(&initial_data_size));
@@ -361,6 +367,7 @@ Status LBMCorruptor::AddPartialRecordToContainer() {
   {
     RWFileOptions opts;
     opts.mode = Env::MUST_EXIST;
+    opts.is_sensitive = true;
     unique_ptr<RWFile> metadata_file;
     RETURN_NOT_OK(env_->NewRWFile(opts, c->metadata_filename, &metadata_file));
     uint64_t initial_metadata_size;
@@ -423,6 +430,7 @@ Status LBMCorruptor::OpenMetadataWriter(
     unique_ptr<WritablePBContainerFile>* writer) {
   RWFileOptions opts;
   opts.mode = Env::MUST_EXIST;
+  opts.is_sensitive = true;
   unique_ptr<RWFile> metadata_file;
   RETURN_NOT_OK(env_->NewRWFile(opts,
                                 container.metadata_filename,
diff --git a/src/kudu/fs/log_block_manager-test.cc 
b/src/kudu/fs/log_block_manager-test.cc
index 9bd6e24..638d310 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -98,6 +98,7 @@ DEFINE_int32(startup_benchmark_deleted_block_percentage, 90,
              "Percentage of deleted blocks in containers.");
 DEFINE_validator(startup_benchmark_deleted_block_percentage,
                  [](const char* /*n*/, int32_t v) { return 0 <= v && v <= 100; 
});
+DECLARE_bool(encrypt_data_at_rest);
 
 // Block manager metrics.
 METRIC_DECLARE_counter(block_manager_total_blocks_deleted);
@@ -117,7 +118,7 @@ namespace internal {
 class LogBlockContainer;
 } // namespace internal
 
-class LogBlockManagerTest : public KuduTest {
+class LogBlockManagerTest : public KuduTest, public 
::testing::WithParamInterface<bool> {
  public:
   LogBlockManagerTest() :
       test_tablet_name_("test_tablet"),
@@ -231,6 +232,10 @@ class LogBlockManagerTest : public KuduTest {
     ASSERT_TRUE(report.partial_record_check->entries.empty());
   }
 
+  void EnableEncryption(bool enable) {
+    FLAGS_encrypt_data_at_rest = enable;
+  }
+
   DataDirGroupPB test_group_pb_;
   string test_tablet_name_;
   CreateBlockOptions test_block_opts_;
@@ -327,7 +332,10 @@ static void CheckLogMetrics(const 
scoped_refptr<MetricEntity>& entity,
   }
 }
 
-TEST_F(LogBlockManagerTest, MetricsTest) {
+INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerTest, 
::testing::Values(false, true));
+
+TEST_P(LogBlockManagerTest, MetricsTest) {
+  EnableEncryption(GetParam());
   MetricRegistry registry;
   scoped_refptr<MetricEntity> entity = 
METRIC_ENTITY_server.Instantiate(&registry, "test");
   ASSERT_OK(ReopenBlockManager(entity));
@@ -499,7 +507,8 @@ TEST_F(LogBlockManagerTest, MetricsTest) {
         {0, &METRIC_log_block_manager_dead_containers_deleted} }));
 }
 
-TEST_F(LogBlockManagerTest, ContainerPreallocationTest) {
+TEST_P(LogBlockManagerTest, ContainerPreallocationTest) {
+  EnableEncryption(GetParam());
   string kTestData = "test data";
 
   // For this test to work properly, the preallocation window has to be at
@@ -543,7 +552,8 @@ TEST_F(LogBlockManagerTest, ContainerPreallocationTest) {
 
 // Test for KUDU-2202 to ensure that once the block manager has been notified
 // of a block ID, it will not reuse it.
-TEST_F(LogBlockManagerTest, TestBumpBlockIds) {
+TEST_P(LogBlockManagerTest, TestBumpBlockIds) {
+  EnableEncryption(GetParam());
   const int kNumBlocks = 10;
   vector<BlockId> block_ids;
   unique_ptr<WritableBlock> writer;
@@ -575,7 +585,8 @@ TEST_F(LogBlockManagerTest, TestBumpBlockIds) {
 
 // Regression test for KUDU-1190, a crash at startup when a block ID has been
 // reused.
-TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
+TEST_P(LogBlockManagerTest, TestReuseBlockIds) {
+  EnableEncryption(GetParam());
   // Typically, the LBM starts with a random block ID when running as a
   // gtest. In this test, we want to control the block IDs.
   bm_->next_block_id_.Store(1);
@@ -649,7 +660,8 @@ TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
 //
 // Note that we rely on filesystem integrity to ensure that we do not lose
 // trailing, fsync()ed metadata.
-TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
+TEST_P(LogBlockManagerTest, TestMetadataTruncation) {
+  EnableEncryption(GetParam());
   // Create several blocks.
   vector<BlockId> created_blocks;
   BlockId last_block_id;
@@ -690,6 +702,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
     {
       RWFileOptions opts;
       opts.mode = Env::MUST_EXIST;
+      opts.is_sensitive = true;
       unique_ptr<RWFile> file;
       ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
       ASSERT_OK(file->Truncate(good_meta_size + num_bytes));
@@ -748,6 +761,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   {
     RWFileOptions opts;
     opts.mode = Env::MUST_EXIST;
+    opts.is_sensitive = true;
     unique_ptr<RWFile> file;
     ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
     ASSERT_OK(file->Truncate(good_meta_size - 1));
@@ -795,7 +809,9 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   // Find location of 2nd record in metadata file and corrupt it.
   // This is an unrecoverable error because it's in the middle of the file.
   unique_ptr<RandomAccessFile> meta_file;
-  ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
+  RandomAccessFileOptions raf_opts;
+  raf_opts.is_sensitive = true;
+  ASSERT_OK(env_->NewRandomAccessFile(raf_opts, metadata_path, &meta_file));
   ReadablePBContainerFile pb_reader(std::move(meta_file));
   ASSERT_OK(pb_reader.Open());
   BlockRecordPB record;
@@ -804,7 +820,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
 
   uint64_t latest_meta_size;
   ASSERT_OK(env_->GetFileSize(metadata_path, &latest_meta_size));
-  ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
+  ASSERT_OK(env_->NewRandomAccessFile(raf_opts, metadata_path, &meta_file));
   unique_ptr<uint8_t[]> scratch(new uint8_t[latest_meta_size]);
   Slice result(scratch.get(), latest_meta_size);
   ASSERT_OK(meta_file->Read(0, result));
@@ -814,7 +830,9 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   // value and also cause the length checksum not to validate.
   data[offset + 3] ^= 1 << 7;
   unique_ptr<WritableFile> writable_file;
-  ASSERT_OK(env_->NewWritableFile(metadata_path, &writable_file));
+  WritableFileOptions wf_opts;
+  wf_opts.is_sensitive = true;
+  ASSERT_OK(env_->NewWritableFile(wf_opts, metadata_path, &writable_file));
   ASSERT_OK(writable_file->Append(data));
   ASSERT_OK(writable_file->Close());
 
@@ -826,9 +844,9 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
 
   // Now truncate both the data and metadata files.
   // This should be recoverable. See KUDU-668.
-  ASSERT_OK(env_->NewWritableFile(metadata_path, &writable_file));
+  ASSERT_OK(env_->NewWritableFile(wf_opts, metadata_path, &writable_file));
   ASSERT_OK(writable_file->Close());
-  ASSERT_OK(env_->NewWritableFile(data_path, &writable_file));
+  ASSERT_OK(env_->NewWritableFile(wf_opts, data_path, &writable_file));
   ASSERT_OK(writable_file->Close());
 
   ASSERT_OK(ReopenBlockManager());
@@ -836,7 +854,8 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
 
 // Regression test for a crash when a container's append offset exceeded its
 // preallocation offset.
-TEST_F(LogBlockManagerTest, TestAppendExceedsPreallocation) {
+TEST_P(LogBlockManagerTest, TestAppendExceedsPreallocation) {
+  EnableEncryption(GetParam());
   FLAGS_log_container_preallocate_bytes = 1;
 
   // Create a container, preallocate it by one byte, and append more than one.
@@ -851,7 +870,8 @@ TEST_F(LogBlockManagerTest, TestAppendExceedsPreallocation) 
{
   ASSERT_OK(writer->Append("hello world"));
 }
 
-TEST_F(LogBlockManagerTest, TestPreallocationAndTruncation) {
+TEST_P(LogBlockManagerTest, TestPreallocationAndTruncation) {
+  EnableEncryption(GetParam());
   // Ensure preallocation window is greater than the container size itself.
   FLAGS_log_container_max_size = 1024 * 1024;
   FLAGS_log_container_preallocate_bytes = 32 * 1024 * 1024;
@@ -887,6 +907,7 @@ TEST_F(LogBlockManagerTest, TestPreallocationAndTruncation) 
{
     RWFileOptions opts;
     opts.mode = Env::MUST_EXIST;
     ASSERT_OK(env_->NewRWFile(opts, fname, &data_file));
+    opts.is_sensitive = true;
     ASSERT_OK(data_file->PreAllocate(size_after_close, size_after_close, 
mode));
     uint64_t size_after_preallocate;
     ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_preallocate));
@@ -912,7 +933,8 @@ TEST_F(LogBlockManagerTest, TestPreallocationAndTruncation) 
{
   }
 }
 
-TEST_F(LogBlockManagerTest, TestContainerWithManyHoles) {
+TEST_P(LogBlockManagerTest, TestContainerWithManyHoles) {
+  EnableEncryption(GetParam());
   // This is a regression test of sorts for KUDU-1508, though it doesn't
   // actually fail if the fix is missing; it just corrupts the filesystem.
 
@@ -970,7 +992,8 @@ TEST_F(LogBlockManagerTest, TestContainerWithManyHoles) {
   ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
 }
 
-TEST_F(LogBlockManagerTest, TestParseKernelRelease) {
+TEST_P(LogBlockManagerTest, TestParseKernelRelease) {
+  EnableEncryption(GetParam());
   ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("1.7.0.0.el6.x86_64"));
 
   // no el6 infix
@@ -1027,10 +1050,7 @@ TEST_F(LogBlockManagerTest, TestParseKernelRelease) {
 //    threads running a long time since last bootstrap)
 //
 // However it still can be used to micro-optimize the startup process.
-class LogBlockManagerStartupBenchmarkTest:
-    public LogBlockManagerTest,
-    public ::testing::WithParamInterface<bool> {
-};
+class LogBlockManagerStartupBenchmarkTest: public LogBlockManagerTest {};
 INSTANTIATE_TEST_SUITE_P(StartupBenchmarkSuite, 
LogBlockManagerStartupBenchmarkTest,
                          ::testing::Values(false, true));
 
@@ -1096,7 +1116,8 @@ TEST_P(LogBlockManagerStartupBenchmarkTest, 
StartupBenchmark) {
 }
 #endif
 
-TEST_F(LogBlockManagerTest, TestFailMultipleTransactionsPerContainer) {
+TEST_P(LogBlockManagerTest, TestFailMultipleTransactionsPerContainer) {
+  EnableEncryption(GetParam());
   // Create multiple transactions that will share a container.
   const int kNumTransactions = 3;
   vector<unique_ptr<BlockCreationTransaction>> block_transactions;
@@ -1154,7 +1175,8 @@ TEST_F(LogBlockManagerTest, 
TestFailMultipleTransactionsPerContainer) {
   }
 }
 
-TEST_F(LogBlockManagerTest, TestLookupBlockLimit) {
+TEST_P(LogBlockManagerTest, TestLookupBlockLimit) {
+  EnableEncryption(GetParam());
   int64_t limit_1024 = LogBlockManager::LookupBlockLimit(1024);
   int64_t limit_2048 = LogBlockManager::LookupBlockLimit(2048);
   int64_t limit_4096 = LogBlockManager::LookupBlockLimit(4096);
@@ -1171,7 +1193,8 @@ TEST_F(LogBlockManagerTest, TestLookupBlockLimit) {
   }
 }
 
-TEST_F(LogBlockManagerTest, TestContainerBlockLimitingByBlockNum) {
+TEST_P(LogBlockManagerTest, TestContainerBlockLimitingByBlockNum) {
+  EnableEncryption(GetParam());
   const int kNumBlocks = 1000;
 
   // Creates 'kNumBlocks' blocks with minimal data.
@@ -1205,7 +1228,8 @@ TEST_F(LogBlockManagerTest, 
TestContainerBlockLimitingByBlockNum) {
   NO_FATALS(AssertNumContainers(4));
 }
 
-TEST_F(LogBlockManagerTest, TestContainerBlockLimitingByMetadataSize) {
+TEST_P(LogBlockManagerTest, TestContainerBlockLimitingByMetadataSize) {
+  EnableEncryption(GetParam());
   const int kNumBlocks = 1000;
 
   // Creates 'kNumBlocks' blocks with minimal data.
@@ -1241,7 +1265,8 @@ TEST_F(LogBlockManagerTest, 
TestContainerBlockLimitingByMetadataSize) {
   NO_FATALS(AssertNumContainers(4));
 }
 
-TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
+TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
+  EnableEncryption(GetParam());
   FLAGS_log_container_preallocate_bytes = 0;
   const int kNumBlocks = 100;
 
@@ -1343,7 +1368,8 @@ TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
   }
 }
 
-TEST_F(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
+TEST_P(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
+  EnableEncryption(GetParam());
   // Enforce that the container's actual size is strictly upper-bounded by the
   // calculated size so we can more easily trigger repairs.
   FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
@@ -1391,7 +1417,8 @@ TEST_F(LogBlockManagerTest, 
TestRepairPreallocateExcessSpace) {
   NO_FATALS(AssertEmptyReport(report));
 }
 
-TEST_F(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
+TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
+  EnableEncryption(GetParam());
   const int kNumBlocks = 100;
 
   // Enforce that the container's actual size is strictly upper-bounded by the
@@ -1451,7 +1478,8 @@ TEST_F(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
   ASSERT_EQ(0, file_size_on_disk);
 }
 
-TEST_F(LogBlockManagerTest, TestRepairIncompleteContainer) {
+TEST_P(LogBlockManagerTest, TestRepairIncompleteContainer) {
+  EnableEncryption(GetParam());
   const int kNumContainers = 20;
 
   // Create some incomplete containers. The corruptor will select between
@@ -1481,7 +1509,8 @@ TEST_F(LogBlockManagerTest, 
TestRepairIncompleteContainer) {
   NO_FATALS(AssertEmptyReport(report));
 }
 
-TEST_F(LogBlockManagerTest, TestDetectMalformedRecords) {
+TEST_P(LogBlockManagerTest, TestDetectMalformedRecords) {
+  EnableEncryption(GetParam());
   const int kNumRecords = 50;
 
   // Create one container.
@@ -1513,7 +1542,8 @@ TEST_F(LogBlockManagerTest, TestDetectMalformedRecords) {
   NO_FATALS(AssertEmptyReport(report));
 }
 
-TEST_F(LogBlockManagerTest, TestDetectMisalignedBlocks) {
+TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) {
+  EnableEncryption(GetParam());
   const int kNumBlocks = 50;
 
   // Create one container.
@@ -1545,7 +1575,8 @@ TEST_F(LogBlockManagerTest, TestDetectMisalignedBlocks) {
   NO_FATALS(AssertEmptyReport(report));
 }
 
-TEST_F(LogBlockManagerTest, TestRepairPartialRecords) {
+TEST_P(LogBlockManagerTest, TestRepairPartialRecords) {
+  EnableEncryption(GetParam());
   const int kNumContainers = 50;
   const int kNumRecords = 10;
 
@@ -1586,7 +1617,8 @@ TEST_F(LogBlockManagerTest, TestRepairPartialRecords) {
   NO_FATALS(AssertEmptyReport(report));
 }
 
-TEST_F(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) {
+TEST_P(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) {
+  EnableEncryption(GetParam());
   // Force our single container to become full once created.
   FLAGS_log_container_max_size = 0;
 
@@ -1623,7 +1655,8 @@ TEST_F(LogBlockManagerTest, 
TestDeleteDeadContainersAtStartup) {
   ASSERT_FALSE(env_->FileExists(metadata_file_name));
 }
 
-TEST_F(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
+TEST_P(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
+  EnableEncryption(GetParam());
   // With this ratio, the metadata of a full container comprised of half dead
   // blocks will be compacted at startup.
   FLAGS_log_container_live_metadata_before_compact_ratio = 0.50;
@@ -1688,7 +1721,8 @@ TEST_F(LogBlockManagerTest, 
TestCompactFullContainerMetadataAtStartup) {
 //
 // The bug was related to a stale file descriptor left in the file_cache, so
 // this test explicitly targets that scenario.
-TEST_F(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
+TEST_P(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
+  EnableEncryption(GetParam());
   // Compact aggressively.
   FLAGS_log_container_live_metadata_before_compact_ratio = 0.99;
   // Use a single shard so that we have an accurate max cache capacity
@@ -1751,7 +1785,8 @@ TEST_F(LogBlockManagerTest, 
TestDeleteFromContainerAfterMetadataCompaction) {
 // Test to ensure that if a directory cannot be read from, its startup process
 // will run smoothly. The directory manager will note the failed directories
 // and only healthy ones are reported.
-TEST_F(LogBlockManagerTest, TestOpenWithFailedDirectories) {
+TEST_P(LogBlockManagerTest, TestOpenWithFailedDirectories) {
+  EnableEncryption(GetParam());
   // Initialize a new directory manager with multiple directories.
   bm_.reset();
   vector<string> test_dirs;
@@ -1800,7 +1835,8 @@ TEST_F(LogBlockManagerTest, 
TestOpenWithFailedDirectories) {
 // 1) a container can be reused when the block is finalized.
 // 2) the block cannot be opened/found until close it.
 // 3) the same container is not marked as available twice.
-TEST_F(LogBlockManagerTest, TestFinalizeBlock) {
+TEST_P(LogBlockManagerTest, TestFinalizeBlock) {
+  EnableEncryption(GetParam());
   // Create 4 blocks.
   vector<unique_ptr<WritableBlock>> blocks;
   for (int i = 0; i < 4; i++) {
@@ -1824,7 +1860,8 @@ TEST_F(LogBlockManagerTest, TestFinalizeBlock) {
 }
 
 // Test available log container selection is LIFO.
-TEST_F(LogBlockManagerTest, TestLIFOContainerSelection) {
+TEST_P(LogBlockManagerTest, TestLIFOContainerSelection) {
+  EnableEncryption(GetParam());
   // Create 4 blocks and 4 opened containers that are not full.
   vector<unique_ptr<WritableBlock>> blocks;
   for (int i = 0; i < 4; i++) {
@@ -1860,7 +1897,7 @@ TEST_F(LogBlockManagerTest, TestLIFOContainerSelection) {
   ASSERT_EQ(4, bm_->all_containers_by_name_.size());
 }
 
-TEST_F(LogBlockManagerTest, TestAbortBlock) {
+TEST_P(LogBlockManagerTest, TestAbortBlock) {
   unique_ptr<WritableBlock> writer;
   ASSERT_OK(bm_->CreateBlock(test_block_opts_, &writer));
   ASSERT_OK(writer->Append("test data"));
@@ -1876,7 +1913,8 @@ TEST_F(LogBlockManagerTest, TestAbortBlock) {
   ASSERT_EQ(1, bm_->available_containers_by_data_dir_.begin()->second.size());
 }
 
-TEST_F(LogBlockManagerTest, TestDeleteDeadContainersByDeletionTransaction) {
+TEST_P(LogBlockManagerTest, TestDeleteDeadContainersByDeletionTransaction) {
+  EnableEncryption(GetParam());
   const auto TestProcess = [&] (int block_num) {
     ASSERT_GT(block_num, 0);
     MetricRegistry registry;
@@ -1995,7 +2033,8 @@ TEST_F(LogBlockManagerTest, 
TestDeleteDeadContainersByDeletionTransaction) {
 
 // Test for KUDU-2665 to ensure that once the container is full and has no live
 // blocks but with a reference by WritableBlock, it will not be deleted.
-TEST_F(LogBlockManagerTest, TestDoNotDeleteFakeDeadContainer) {
+TEST_P(LogBlockManagerTest, TestDoNotDeleteFakeDeadContainer) {
+  EnableEncryption(GetParam());
   // Lower the max container size.
   FLAGS_log_container_max_size = 64 * 1024;
 
@@ -2058,7 +2097,8 @@ TEST_F(LogBlockManagerTest, 
TestDoNotDeleteFakeDeadContainer) {
   Process(false);
 }
 
-TEST_F(LogBlockManagerTest, TestHalfPresentContainer) {
+TEST_P(LogBlockManagerTest, TestHalfPresentContainer) {
+  EnableEncryption(GetParam());
   BlockId block_id;
   string data_file_name;
   string metadata_file_name;
@@ -2085,7 +2125,9 @@ TEST_F(LogBlockManagerTest, TestHalfPresentContainer) {
     file_cache_.Invalidate(metadata_file_name);
 
     unique_ptr<WritableFile> metadata_file_writer;
-    ASSERT_OK(env_->NewWritableFile(metadata_file_name, 
&metadata_file_writer));
+    WritableFileOptions opts;
+    opts.is_sensitive = true;
+    ASSERT_OK(env_->NewWritableFile(opts, metadata_file_name, 
&metadata_file_writer));
     ASSERT_OK(metadata_file_writer->Append(Slice("a")));
     metadata_file_writer->Close();
   };
@@ -2096,7 +2138,9 @@ TEST_F(LogBlockManagerTest, TestHalfPresentContainer) {
     file_cache_.Invalidate(data_file_name);
 
     unique_ptr<WritableFile> data_file_writer;
-    ASSERT_OK(env_->NewWritableFile(data_file_name, &data_file_writer));
+    WritableFileOptions opts;
+    opts.is_sensitive = true;
+    ASSERT_OK(env_->NewWritableFile(opts, data_file_name, &data_file_writer));
     data_file_writer->Close();
   };
 
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index a9973dd..d5381fa 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -828,6 +828,7 @@ Status LogBlockContainer::Create(LogBlockManager* 
block_manager,
       RWFileOptions rw_opts;
 
       rw_opts.mode = Env::MUST_CREATE;
+      rw_opts.is_sensitive = true;
       metadata_status = block_manager->env()->NewRWFile(
           rw_opts, metadata_path, &rwf);
       metadata_writer.reset(rwf.release());
@@ -879,6 +880,7 @@ Status LogBlockContainer::Open(LogBlockManager* 
block_manager,
   } else {
     RWFileOptions opts;
     opts.mode = Env::MUST_EXIST;
+    opts.is_sensitive = true;
     unique_ptr<RWFile> rwf;
     RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->env()->NewRWFile(opts,
         metadata_path, &rwf));
@@ -948,7 +950,9 @@ Status 
LogBlockContainer::CheckContainerFiles(LogBlockManager* block_manager,
     Status read_status;
     BlockIdSet live_blocks;
     unique_ptr<RandomAccessFile> reader;
-    
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(env->NewRandomAccessFile(metadata_path, 
&reader));
+    RandomAccessFileOptions opts;
+    opts.is_sensitive = true;
+    RETURN_NOT_OK_CONTAINER_DISK_FAILURE(env->NewRandomAccessFile(opts, 
metadata_path, &reader));
     ReadablePBContainerFile pb_reader(std::move(reader));
     RETURN_NOT_OK_CONTAINER_DISK_FAILURE(pb_reader.Open());
     while (true) {
@@ -1004,8 +1008,10 @@ Status LogBlockContainer::ProcessRecords(
     uint64_t* max_block_id) {
   string metadata_path = metadata_file_->filename();
   unique_ptr<RandomAccessFile> metadata_reader;
+  RandomAccessFileOptions opts;
+  opts.is_sensitive = true;
   RETURN_NOT_OK_HANDLE_ERROR(block_manager()->env()->NewRandomAccessFile(
-      metadata_path, &metadata_reader));
+      opts, metadata_path, &metadata_reader));
   ReadablePBContainerFile pb_reader(std::move(metadata_reader));
   RETURN_NOT_OK_HANDLE_ERROR(pb_reader.Open());
 
@@ -1268,6 +1274,7 @@ Status LogBlockContainer::ReopenMetadataWriter() {
     unique_ptr<RWFile> f_uniq;
     RWFileOptions opts;
     opts.mode = Env::MUST_EXIST;
+    opts.is_sensitive = true;
     RETURN_NOT_OK_HANDLE_ERROR(block_manager_->env_->NewRWFile(opts,
         metadata_file_->filename(), &f_uniq));
     f.reset(f_uniq.release());
@@ -2903,6 +2910,7 @@ Status LogBlockManager::Repair(
       unique_ptr<RWFile> file;
       RWFileOptions opts;
       opts.mode = Env::MUST_EXIST;
+      opts.is_sensitive = true;
       RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(
           env_->NewRWFile(opts,
                           StrCat(pr.container, kContainerMetadataFileSuffix),
@@ -3076,7 +3084,9 @@ Status LogBlockManager::RewriteMetadataFile(const 
LogBlockContainer& container,
   string tmpl = metadata_file_name + kTmpInfix + ".XXXXXX";
   unique_ptr<RWFile> tmp_file;
   string tmp_file_name;
-  RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(env_->NewTempRWFile(RWFileOptions(), 
tmpl,
+  RWFileOptions opts;
+  opts.is_sensitive = true;
+  RETURN_NOT_OK_LBM_DISK_FAILURE_PREPEND(env_->NewTempRWFile(opts, tmpl,
                                                              &tmp_file_name, 
&tmp_file),
                                          "could not create temporary metadata 
file");
   auto tmp_deleter = MakeScopedCleanup([&]() {
diff --git a/src/kudu/integration-tests/dense_node-itest.cc 
b/src/kudu/integration-tests/dense_node-itest.cc
index 2e37625..c63ce2b 100644
--- a/src/kudu/integration-tests/dense_node-itest.cc
+++ b/src/kudu/integration-tests/dense_node-itest.cc
@@ -81,9 +81,13 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
-class DenseNodeTest : public ExternalMiniClusterITestBase {
+class DenseNodeTest :
+  public ExternalMiniClusterITestBase,
+  public testing::WithParamInterface<bool> {
 };
 
+INSTANTIATE_TEST_SUITE_P(, DenseNodeTest, testing::Values(false, true));
+
 // Integration test that simulates "dense" Kudu nodes.
 //
 // Storage heavy deployments can be created by running a data-intensive
@@ -92,7 +96,7 @@ class DenseNodeTest : public ExternalMiniClusterITestBase {
 // of metadata with a minimal amount of data. The scale of the metadata can
 // proxy for data in areas we care about (such as start up time, thread count,
 // memory usage, etc.).
-TEST_F(DenseNodeTest, RunTest) {
+TEST_P(DenseNodeTest, RunTest) {
   ExternalMiniClusterOptions opts;
 
   opts.extra_tserver_flags = {
@@ -148,6 +152,11 @@ TEST_F(DenseNodeTest, RunTest) {
     opts.extra_master_flags.emplace_back("--never_fsync=false");
   }
 
+  if (GetParam()) {
+    opts.extra_master_flags.emplace_back("--encrypt_data_at_rest=true");
+    opts.extra_tserver_flags.emplace_back("--encrypt_data_at_rest=true");
+  }
+
   // With the amount of data we're going to write, we need to make sure the
   // tserver has enough time to start back up (startup is only considered to be
   // "complete" when the tserver has loaded all fs metadata from disk).
diff --git a/src/kudu/integration-tests/mini_cluster_fs_inspector.cc 
b/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
index 8b8a2be..37b2eb7 100644
--- a/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
+++ b/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
@@ -19,7 +19,6 @@
 
 #include <algorithm>
 #include <set>
-#include <type_traits>
 #include <utility>
 
 #include <glog/logging.h>
@@ -166,7 +165,7 @@ Status MiniClusterFsInspector::ReadTabletSuperBlockOnTS(int 
ts_idx,
                                                         const string& 
tablet_id,
                                                         TabletSuperBlockPB* 
sb) {
   const auto& sb_path = GetTabletSuperBlockPathOnTS(ts_idx, tablet_id);
-  return pb_util::ReadPBContainerFromPath(env_, sb_path, sb);
+  return pb_util::ReadPBContainerFromPath(env_, sb_path, sb, 
pb_util::SENSITIVE);
 }
 
 int64_t MiniClusterFsInspector::GetTabletSuperBlockMTimeOrDie(int ts_idx,
@@ -191,7 +190,7 @@ Status 
MiniClusterFsInspector::ReadConsensusMetadataOnTS(int ts_idx,
   if (!env_->FileExists(cmeta_path)) {
     return Status::NotFound("Consensus metadata file not found", cmeta_path);
   }
-  return pb_util::ReadPBContainerFromPath(env_, cmeta_path, cmeta_pb);
+  return pb_util::ReadPBContainerFromPath(env_, cmeta_path, cmeta_pb, 
pb_util::SENSITIVE);
 }
 
 Status MiniClusterFsInspector::WriteConsensusMetadataOnTS(
@@ -200,7 +199,8 @@ Status MiniClusterFsInspector::WriteConsensusMetadataOnTS(
     const ConsensusMetadataPB& cmeta_pb) {
   auto cmeta_path = GetConsensusMetadataPathOnTS(ts_idx, tablet_id);
   return pb_util::WritePBContainerToPath(env_, cmeta_path, cmeta_pb,
-                                         pb_util::OVERWRITE, pb_util::NO_SYNC);
+                                         pb_util::OVERWRITE, pb_util::NO_SYNC,
+                                         pb_util::SENSITIVE);
 }
 
 
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc 
b/src/kudu/integration-tests/raft_consensus-itest.cc
index 1ee3ecb..038c05c 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -93,6 +93,7 @@ DECLARE_int32(num_client_threads);
 DECLARE_int32(num_replicas);
 DECLARE_int32(num_tablet_servers);
 DECLARE_int32(rpc_timeout);
+DECLARE_bool(encrypt_data_at_rest);
 
 METRIC_DECLARE_entity(server);
 METRIC_DECLARE_entity(tablet);
@@ -792,14 +793,21 @@ TEST_F(RaftConsensusITest, TestInsertOnNonLeader) {
   NO_FATALS(AssertAllReplicasAgree(0));
 }
 
+class RaftConsensusParamEncryptionITest :
+    public RaftConsensusITest,
+    public ::testing::WithParamInterface<bool> {
+};
+INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, RaftConsensusParamEncryptionITest,
+                         ::testing::Values(false, true));
+
 // Test that when a follower is stopped for a long time, the log cache
 // properly evicts operations, but still allows the follower to catch
 // up when it comes back.
 //
 // Also asserts that the other replicas retain logs for the stopped
 // follower to catch up from.
-TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
-  const vector<string> kTsFlags = {
+TEST_P(RaftConsensusParamEncryptionITest, TestCatchupAfterOpsEvicted) {
+  vector<string> kTsFlags = {
     "--log_cache_size_limit_mb=1",
     "--consensus_max_batch_size_bytes=500000",
     // Use short and synchronous rolls so that we can test log segment 
retention.
@@ -815,6 +823,13 @@ TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
     "--log_compression_codec=no_compression"
   };
 
+  if (GetParam()) {
+    // We need to enable encryption both in the mini-cluster and in the current
+    // process, as both of them access encrypted files.
+    kTsFlags.emplace_back("--encrypt_data_at_rest=true");
+    FLAGS_encrypt_data_at_rest = true;
+  }
+
   NO_FATALS(BuildAndStart(kTsFlags));
   TServerDetails* replica = (*tablet_replicas_.begin()).second;
   ASSERT_TRUE(replica != nullptr);
@@ -871,11 +886,17 @@ TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
 // itself.
 //
 // This is a regression test for KUDU-775 and KUDU-562.
-TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) {
+TEST_P(RaftConsensusParamEncryptionITest, TestFollowerFallsBehindLeaderGC) {
   vector<string> ts_flags = {
     // Disable follower eviction to maintain the original intent of this test.
     "--evict_failed_followers=false",
   };
+  if (GetParam()) {
+    // We need to enable encryption both in the mini-cluster and in the current
+    // process, as both of them access encrypted files.
+    ts_flags.emplace_back("--encrypt_data_at_rest=true");
+    FLAGS_encrypt_data_at_rest = true;
+  }
   AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC().
   NO_FATALS(BuildAndStart(ts_flags));
 
diff --git a/src/kudu/integration-tests/security-itest.cc 
b/src/kudu/integration-tests/security-itest.cc
index 60646aa..8b87b90 100644
--- a/src/kudu/integration-tests/security-itest.cc
+++ b/src/kudu/integration-tests/security-itest.cc
@@ -105,6 +105,7 @@ class SecurityITest : public KuduTest {
  public:
   SecurityITest() {
     cluster_opts_.enable_kerberos = true;
+    cluster_opts_.enable_encryption = true;
     cluster_opts_.num_tablet_servers = 3;
     cluster_opts_.extra_master_flags.emplace_back("--rpc_trace_negotiation");
     cluster_opts_.extra_tserver_flags.emplace_back("--rpc_trace_negotiation");
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc 
b/src/kudu/mini-cluster/external_mini_cluster.cc
index d20355d..a9f1ddc 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -124,6 +124,7 @@ ExternalMiniClusterOptions::ExternalMiniClusterOptions()
       principal("kudu"),
       hms_mode(HmsMode::NONE),
       enable_ranger(false),
+      enable_encryption(false),
       logtostderr(true),
       start_process_timeout(MonoDelta::FromSeconds(70)),
       rpc_negotiation_timeout(MonoDelta::FromSeconds(3))
@@ -565,6 +566,7 @@ Status ExternalMiniCluster::AddTabletServer() {
 
   ExternalDaemonOptions opts;
   opts.messenger = messenger_;
+  opts.enable_encryption = opts_.enable_encryption;
   opts.block_manager_type = opts_.block_manager_type;
   opts.exe = GetBinaryPath(kKuduBinaryName);
   opts.wal_dir = GetWalPath(daemon_id);
@@ -1127,6 +1129,10 @@ std::vector<std::string> 
ExternalDaemon::GetDaemonFlags(const ExternalDaemonOpti
     flags.emplace_back("--logbuflevel=-1");
   }
 
+  if (opts.enable_encryption) {
+    flags.emplace_back("--encrypt_data_at_rest=true");
+  }
+
   // If large keys are not enabled.
   if (!UseLargeKeys()) {
     // Generate smaller RSA keys -- generating a 768-bit key is faster
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h 
b/src/kudu/mini-cluster/external_mini_cluster.h
index bcd7c26..44dd5d5 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -26,6 +26,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -210,6 +211,11 @@ struct ExternalMiniClusterOptions {
   // Default: false.
   bool enable_ranger;
 
+  // If true, enable data at rest encryption.
+  //
+  // Default: false.
+  bool enable_encryption;
+
   // If true, sends logging output to stderr instead of a log file.
   //
   // Default: true.
@@ -544,10 +550,12 @@ class ExternalMiniCluster : public MiniCluster {
 
 struct ExternalDaemonOptions {
   ExternalDaemonOptions()
-      : logtostderr(false) {
+      : logtostderr(false),
+        enable_encryption(false) {
   }
 
   bool logtostderr;
+  bool enable_encryption;
   std::shared_ptr<rpc::Messenger> messenger;
   std::string block_manager_type;
   std::string exe;
diff --git a/src/kudu/postgres/mini_postgres.cc 
b/src/kudu/postgres/mini_postgres.cc
index 8146cef..aafc8f0 100644
--- a/src/kudu/postgres/mini_postgres.cc
+++ b/src/kudu/postgres/mini_postgres.cc
@@ -138,7 +138,9 @@ Status MiniPostgres::CreateConfigs() {
   ReadFileToString(env, config_file, &config);
   config.append(Substitute("\nlisten_addresses = '$0'\nport = $1\n", host_, 
port_));
   unique_ptr<WritableFile> file;
-  RETURN_NOT_OK(env->NewWritableFile(config_file, &file));
+  WritableFileOptions opts;
+  opts.is_sensitive = false;
+  RETURN_NOT_OK(env->NewWritableFile(opts, config_file, &file));
   RETURN_NOT_OK(file->Append(config));
   return file->Close();
 }
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index e69ce98..eead456 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -297,7 +297,9 @@ Status GetOrCreateLog4j2PropertiesFile(Env* env, string* 
logging_properties_path
     // don't read a partial file (not expected, but just in case).
     unique_ptr<WritableFile> tmp_file;
     string tmp_path;
-    RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(),
+    WritableFileOptions opts;
+    opts.is_sensitive = false;
+    RETURN_NOT_OK(env->NewTempWritableFile(opts,
                                            Substitute("$0.XXXXXX", 
log4j2_properties_path),
                                            &tmp_path, &tmp_file));
     // If anything fails, clean up the tmp file.
diff --git a/src/kudu/security/test/mini_kdc.cc 
b/src/kudu/security/test/mini_kdc.cc
index 727bfe4..831699c 100644
--- a/src/kudu/security/test/mini_kdc.cc
+++ b/src/kudu/security/test/mini_kdc.cc
@@ -310,8 +310,10 @@ Status MiniKdc::Kinit(const string& username) {
   unique_ptr<WritableFile> tmp_cc_file;
   string tmp_cc_path;
   const auto tmp_template = Substitute("kinit-temp-$0.XXXXXX", username);
+  WritableFileOptions opts;
+  opts.is_sensitive = false;
   RETURN_NOT_OK_PREPEND(Env::Default()->NewTempWritableFile(
-      WritableFileOptions(),
+      opts,
       JoinPathSegments(options_.data_root, tmp_template),
       &tmp_cc_path, &tmp_cc_file),
       "could not create temporary file");
diff --git a/src/kudu/tablet/tablet_metadata.cc 
b/src/kudu/tablet/tablet_metadata.cc
index 00b5de1..aabcb7d 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -700,7 +700,8 @@ Status TabletMetadata::ReplaceSuperBlockUnlocked(const 
TabletSuperBlockPB &pb) {
   string path = fs_manager_->GetTabletMetadataPath(tablet_id_);
   RETURN_NOT_OK_PREPEND(pb_util::WritePBContainerToPath(
                             fs_manager_->env(), path, pb,
-                            pb_util::OVERWRITE, pb_util::SYNC),
+                            pb_util::OVERWRITE, pb_util::SYNC,
+                            pb_util::SENSITIVE),
                         Substitute("Failed to write tablet metadata $0", 
tablet_id_));
   flush_count_for_tests_++;
   RETURN_NOT_OK(UpdateOnDiskSize());
@@ -721,7 +722,7 @@ boost::optional<consensus::OpId> 
TabletMetadata::tombstone_last_logged_opid() co
 Status TabletMetadata::ReadSuperBlockFromDisk(TabletSuperBlockPB* superblock) 
const {
   string path = fs_manager_->GetTabletMetadataPath(tablet_id_);
   RETURN_NOT_OK_PREPEND(
-      pb_util::ReadPBContainerFromPath(fs_manager_->env(), path, superblock),
+      pb_util::ReadPBContainerFromPath(fs_manager_->env(), path, superblock, 
pb_util::SENSITIVE),
       Substitute("Could not load tablet metadata from $0", path));
   return Status::OK();
 }
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index eac8301..c34bdd0 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -644,10 +644,7 @@ class ToolTest : public KuduTest {
   // Write YAML content to file ${KUDU_CONFIG}/kudurc.
   void PrepareConfigFile(const string& content) {
     string fname = GetTestPath("kudurc");
-    unique_ptr<WritableFile> writable_file;
-    ASSERT_OK(env_->NewWritableFile(fname, &writable_file));
-    ASSERT_OK(writable_file->Append(Slice(content)));
-    ASSERT_OK(writable_file->Close());
+    ASSERT_OK(WriteStringToFile(env_, content, fname));
   }
 
   void CheckCorruptClusterInfoConfigFile(const string& content,
diff --git a/src/kudu/tools/tool_action_pbc.cc 
b/src/kudu/tools/tool_action_pbc.cc
index be5c636..81c97ff 100644
--- a/src/kudu/tools/tool_action_pbc.cc
+++ b/src/kudu/tools/tool_action_pbc.cc
@@ -103,6 +103,13 @@ namespace {
 
 const char* const kPathArg = "path";
 
+bool IsFileEncrypted(Env* env, const std::string& fname) {
+  // TODO(abukor): replace with real encryption check. As instance files are 
the
+  // only PBC files that are unencrypted right now, this check will suffice
+  // until we can tell if a file is encrypted based on an encryption header.
+  return fname.length() < 8 || fname.compare(fname.length() - 8, 8, 
"instance") != 0;
+}
+
 Status DumpPBContainerFile(const RunnerContext& context) {
   const string& path = FindOrDie(context.required_args, kPathArg);
   auto format = ReadablePBContainerFile::Format::DEFAULT;
@@ -118,7 +125,9 @@ Status DumpPBContainerFile(const RunnerContext& context) {
 
   Env* env = Env::Default();
   unique_ptr<RandomAccessFile> reader;
-  RETURN_NOT_OK(env->NewRandomAccessFile(path, &reader));
+  RandomAccessFileOptions opts;
+  opts.is_sensitive = IsFileEncrypted(env, path);
+  RETURN_NOT_OK(env->NewRandomAccessFile(opts, path, &reader));
   ReadablePBContainerFile pb_reader(std::move(reader));
   RETURN_NOT_OK(pb_reader.Open());
   RETURN_NOT_OK(pb_reader.Dump(&std::cout, format));
@@ -152,7 +161,9 @@ Status EditFile(const RunnerContext& context) {
 
   // Open the original file.
   unique_ptr<RandomAccessFile> reader;
-  RETURN_NOT_OK(env->NewRandomAccessFile(path, &reader));
+  RandomAccessFileOptions reader_opts;
+  reader_opts.is_sensitive = IsFileEncrypted(env, path);
+  RETURN_NOT_OK(env->NewRandomAccessFile(reader_opts, path, &reader));
   ReadablePBContainerFile pb_reader(std::move(reader));
   RETURN_NOT_OK(pb_reader.Open());
 
@@ -160,18 +171,23 @@ Status EditFile(const RunnerContext& context) {
   // Do this up front so that we fail early if the user doesn't have 
appropriate permissions.
   const string tmp_out_path = path + ".new";
   unique_ptr<RWFile> out_rwfile;
-  RETURN_NOT_OK_PREPEND(env->NewRWFile(tmp_out_path, &out_rwfile), "couldn't 
open output PBC file");
+  RWFileOptions out_rwfile_opts;
+  out_rwfile_opts.is_sensitive = IsFileEncrypted(env, path);
+  RETURN_NOT_OK_PREPEND(env->NewRWFile(out_rwfile_opts, tmp_out_path, 
&out_rwfile),
+                        "couldn't open output PBC file");
   auto delete_tmp_output = MakeScopedCleanup([&]() {
     WARN_NOT_OK(env->DeleteFile(tmp_out_path),
                 "Could not delete file " + tmp_out_path);
   });
 
   // Also make a tmp file where we'll write the PBC in JSON format for
-  // easy editing.
+  // easy editing. Encryption needs to be disabled for the tmp file.
   unique_ptr<WritableFile> tmp_json_file;
   string tmp_json_path;
+  WritableFileOptions tmp_json_opts;
+  tmp_json_opts.is_sensitive = false;
   const string tmp_template = Substitute("pbc-edit$0.XXXXXX", kTmpInfix);
-  RETURN_NOT_OK_PREPEND(env->NewTempWritableFile(WritableFileOptions(),
+  RETURN_NOT_OK_PREPEND(env->NewTempWritableFile(tmp_json_opts,
                                                  JoinPathSegments(dir, 
tmp_template),
                                                  &tmp_json_path, 
&tmp_json_file),
                         "couldn't create temporary file");
diff --git a/src/kudu/tserver/tablet_copy_client.cc 
b/src/kudu/tserver/tablet_copy_client.cc
index c9ff0d2..f8fd978 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -488,8 +488,9 @@ Status TabletCopyClient::Finish() {
   if (FLAGS_tablet_copy_save_downloaded_metadata) {
     string meta_path = fs_manager_->GetTabletMetadataPath(tablet_id_);
     string meta_copy_path = Substitute("$0.copy.$1$2", meta_path, 
start_time_micros_, kTmpInfix);
-    RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), meta_path, meta_copy_path,
-                                   WritableFileOptions()),
+    WritableFileOptions opts;
+    opts.is_sensitive = true;
+    RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), meta_path, meta_copy_path, 
opts),
                           "Unable to make copy of tablet metadata");
   }
 
@@ -766,6 +767,7 @@ Status TabletCopyClient::DownloadWAL(uint64_t 
wal_segment_seqno) {
 
   WritableFileOptions opts;
   opts.sync_on_close = true;
+  opts.is_sensitive = true;
   unique_ptr<WritableFile> writer;
   RETURN_NOT_OK_PREPEND(fs_manager_->env()->NewWritableFile(opts, dest_path, 
&writer),
                         "Unable to open file for writing");
@@ -793,8 +795,9 @@ Status TabletCopyClient::WriteConsensusMetadata() {
   if (FLAGS_tablet_copy_save_downloaded_metadata) {
     string cmeta_path = fs_manager_->GetConsensusMetadataPath(tablet_id_);
     string cmeta_copy_path = Substitute("$0.copy.$1$2", cmeta_path, 
start_time_micros_, kTmpInfix);
-    RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), cmeta_path, cmeta_copy_path,
-                                   WritableFileOptions()),
+    WritableFileOptions opts;
+    opts.is_sensitive = true;
+    RETURN_NOT_OK_PREPEND(CopyFile(Env::Default(), cmeta_path, 
cmeta_copy_path, opts),
                           "Unable to make copy of consensus metadata");
   }
 
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc 
b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 94fc6e0..eb12fe1 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -252,6 +252,7 @@ class TabletCopyTest : public KuduTabletTest {
 
     // Write the file to a temporary location.
     WritableFileOptions opts;
+    opts.is_sensitive = true;
     string path_template = GetTestPath(Substitute("test_block_$0$1.XXXXXX",
                                                   block_id.ToString(),
                                                   kTmpInfix));
@@ -260,7 +261,9 @@ class TabletCopyTest : public KuduTabletTest {
     CHECK_OK(writable_file->Append(Slice(data.data(), data.size())));
     CHECK_OK(writable_file->Close());
 
-    CHECK_OK(Env::Default()->NewSequentialFile(*path, file));
+    SequentialFileOptions seq_opts;
+    seq_opts.is_sensitive = true;
+    CHECK_OK(Env::Default()->NewSequentialFile(seq_opts, *path, file));
   }
 
   MetricRegistry metric_registry_;
diff --git a/src/kudu/tserver/tablet_server-test.cc 
b/src/kudu/tserver/tablet_server-test.cc
index 4ea9ac2..951ee14 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -2289,7 +2289,7 @@ TEST_P(ScanCorruptedDeltasParamTest, Test) {
 
   // Flush the corruption and rebuild the server with the corrupt data.
   ASSERT_OK(pb_util::WritePBContainerToPath(env_,
-      meta_path, superblock_pb, pb_util::OVERWRITE, pb_util::SYNC));
+      meta_path, superblock_pb, pb_util::OVERWRITE, pb_util::SYNC, 
pb_util::SENSITIVE));
   ASSERT_OK(ShutdownAndRebuildTablet());
   LOG(INFO) << Substitute("Rebuilt tablet $0 with broken blocks", 
tablet_replica_->tablet_id());
 
@@ -4361,7 +4361,7 @@ TEST_F(TabletServerTest, TestDataDirGroupsCreated) {
   string tablet_meta_path = 
JoinPathSegments(GetTestPath("TabletServerTest-fsroot"), "tablet-meta");
   string pb_path = JoinPathSegments(tablet_meta_path, 
tablet_replica_->tablet_id());
   ASSERT_OK(pb_util::WritePBContainerToPath(Env::Default(),
-      pb_path, superblock, pb_util::OVERWRITE, pb_util::SYNC));
+      pb_path, superblock, pb_util::OVERWRITE, pb_util::SYNC, 
pb_util::SENSITIVE));
 
   // Verify that the on-disk copy has its DataDirGroup missing.
   
ASSERT_OK(tablet_replica_->tablet()->metadata()->ReadSuperBlockFromDisk(&superblock));
diff --git a/src/kudu/util/env-test.cc b/src/kudu/util/env-test.cc
index 5dbe4cf..88b31b1 100644
--- a/src/kudu/util/env-test.cc
+++ b/src/kudu/util/env-test.cc
@@ -1253,7 +1253,7 @@ TEST_F(TestEnv, TestEncryption) {
   const string kFile = JoinPathSegments(test_dir_, "encrypted_file");
   unique_ptr<RWFile> rw;
   RWFileOptions opts;
-  opts.encrypted = true;
+  opts.is_sensitive = true;
   ASSERT_OK(env_->NewRWFile(opts, kFile, &rw));
 
   string kTestData =
@@ -1286,7 +1286,7 @@ TEST_F(TestEnv, TestEncryption) {
 
   unique_ptr<RandomAccessFile> unencrpyted;
   RandomAccessFileOptions unencrpyted_opts;
-  unencrpyted_opts.encrypted = false;
+  unencrpyted_opts.is_sensitive = false;
   ASSERT_OK(env_->NewRandomAccessFile(unencrpyted_opts, kFile, &unencrpyted));
 
   // Treating it as an unencrypted file should yield garbage and not contain 
the
@@ -1298,7 +1298,7 @@ TEST_F(TestEnv, TestEncryption) {
   // Check if the file can be read into a SequentialFile.
   unique_ptr<SequentialFile> seq_file;
   SequentialFileOptions seq_opts;
-  seq_opts.encrypted = true;
+  seq_opts.is_sensitive = true;
   ASSERT_OK(env_->NewSequentialFile(seq_opts, kFile, &seq_file));
 
   ASSERT_OK(seq_file->Read(&result1));
@@ -1310,7 +1310,7 @@ TEST_F(TestEnv, TestEncryption) {
   // as an encrypted file.
   unique_ptr<RandomAccessFile> random;
   RandomAccessFileOptions random_opts;
-  random_opts.encrypted = true;
+  random_opts.is_sensitive = true;
   ASSERT_OK(env_->NewRandomAccessFile(random_opts, kFile, &random));
   size_t size = kTestData.length() + kTestData2.length();
   uint8_t scratch[size];
@@ -1330,7 +1330,7 @@ TEST_F(TestEnv, TestPreallocatedReadEncryptedFile) {
   const string kFile = JoinPathSegments(test_dir_, "encrypted_file");
   unique_ptr<RWFile> rw;
   RWFileOptions opts;
-  opts.encrypted = true;
+  opts.is_sensitive = true;
   ASSERT_OK(env_->NewRWFile(opts, kFile, &rw));
 
   ASSERT_OK(rw->PreAllocate(0, 1024, RWFile::CHANGE_FILE_SIZE));
@@ -1347,7 +1347,7 @@ TEST_F(TestEnv, TestEncryptionMultipleSlices) {
   const string kFile = JoinPathSegments(test_dir_, "encrypted_file");
   unique_ptr<RWFile> rw;
   RWFileOptions opts;
-  opts.encrypted = true;
+  opts.is_sensitive = true;
   ASSERT_OK(env_->NewRWFile(opts, kFile, &rw));
 
   vector<Slice> data = {"foo", "bar", "hello", "world"};
diff --git a/src/kudu/util/env.cc b/src/kudu/util/env.cc
index 3bc5ee7..2935fcf 100644
--- a/src/kudu/util/env.cc
+++ b/src/kudu/util/env.cc
@@ -26,9 +26,12 @@ FileLock::~FileLock() {
 
 static Status DoWriteStringToFile(Env* env, const Slice& data,
                                   const std::string& fname,
-                                  bool should_sync) {
+                                  bool should_sync,
+                                  bool is_sensitive) {
   unique_ptr<WritableFile> file;
-  Status s = env->NewWritableFile(fname, &file);
+  WritableFileOptions opts;
+  opts.is_sensitive = is_sensitive;
+  Status s = env->NewWritableFile(opts, fname, &file);
   if (!s.ok()) {
     return s;
   }
@@ -50,18 +53,20 @@ static Status DoWriteStringToFile(Env* env, const Slice& 
data,
 // TODO: move these utils into env_util
 Status WriteStringToFile(Env* env, const Slice& data,
                          const std::string& fname) {
-  return DoWriteStringToFile(env, data, fname, false);
+  return DoWriteStringToFile(env, data, fname, false, false);
 }
 
 Status WriteStringToFileSync(Env* env, const Slice& data,
                              const std::string& fname) {
-  return DoWriteStringToFile(env, data, fname, true);
+  return DoWriteStringToFile(env, data, fname, true, false);
 }
 
-Status ReadFileToString(Env* env, const std::string& fname, faststring* data) {
+Status DoReadFileToString(Env* env, const std::string& fname, faststring* 
data, bool is_sensitive) {
   data->clear();
   unique_ptr<SequentialFile> file;
-  Status s = env->NewSequentialFile(fname, &file);
+  SequentialFileOptions opts;
+  opts.is_sensitive = is_sensitive;
+  Status s = env->NewSequentialFile(opts, fname, &file);
   if (!s.ok()) {
     return s;
   }
@@ -81,4 +86,8 @@ Status ReadFileToString(Env* env, const std::string& fname, 
faststring* data) {
   return s;
 }
 
+Status ReadFileToString(Env* env, const std::string& fname, faststring* data) {
+  return DoReadFileToString(env, fname, data, false);
+}
+
 }  // namespace kudu
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index 2ed394b..e3813e8 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -380,6 +380,8 @@ class Env {
   // Only useful for tests.
   static const char* const kInjectedFailureStatusMsg;
 
+  virtual const bool IsEncryptionEnabled() = 0;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(Env);
 };
@@ -414,9 +416,11 @@ class Fifo : public File {
 };
 
 struct SequentialFileOptions {
-  bool encrypted;
+  // Whether the file contains sensitive information. If true, the file is
+  // encrypted if encryption is enabled.
+  bool is_sensitive;
 
-  SequentialFileOptions() : encrypted(false) {}
+  SequentialFileOptions() : is_sensitive(false) {}
 };
 
 // A file abstraction for reading sequentially through a file
@@ -494,17 +498,21 @@ struct WritableFileOptions {
   // See CreateMode for details.
   Env::OpenMode mode;
 
-  bool encrypted;
+  // Whether the file contains sensitive information. If true, the file is
+  // encrypted if encryption is enabled.
+  bool is_sensitive;
 
   WritableFileOptions()
-      : sync_on_close(false), mode(Env::CREATE_OR_OPEN_WITH_TRUNCATE), 
encrypted(false) {}
+      : sync_on_close(false), mode(Env::CREATE_OR_OPEN_WITH_TRUNCATE), 
is_sensitive(false) {}
 };
 
 // Options specified when a file is opened for random access.
 struct RandomAccessFileOptions {
-  bool encrypted;
+  // Whether the file contains sensitive information. If true, the file is
+  // encrypted if encryption is enabled.
+  bool is_sensitive;
 
-  RandomAccessFileOptions() : encrypted(false) {}
+  RandomAccessFileOptions() : is_sensitive(false) {}
 };
 
 // A file abstraction for sequential writing.  The implementation
@@ -564,10 +572,12 @@ struct RWFileOptions {
   // See CreateMode for details.
   Env::OpenMode mode;
 
-  bool encrypted;
+  // Whether the file contains sensitive information. If true, the file is
+  // encrypted if encryption is enabled.
+  bool is_sensitive;
 
   RWFileOptions()
-      : sync_on_close(false), mode(Env::CREATE_OR_OPEN_WITH_TRUNCATE), 
encrypted(false) {}
+      : sync_on_close(false), mode(Env::CREATE_OR_OPEN_WITH_TRUNCATE), 
is_sensitive(false) {}
 };
 
 // A file abstraction for both reading and writing. No notion of a built-in
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index eec7e5e..04514bb 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -200,6 +200,10 @@ DEFINE_string(env_inject_lock_failure_globs, "",
               "will fail.");
 TAG_FLAG(env_inject_lock_failure_globs, hidden);
 
+DEFINE_bool(encrypt_data_at_rest, false,
+            "Whether sensitive files should be encrypted on the file system.");
+TAG_FLAG(encrypt_data_at_rest, hidden);
+
 static __thread uint64_t thread_local_id;
 static Atomic64 cur_thread_local_id_;
 
@@ -1300,7 +1304,7 @@ class PosixEnv : public Env {
     if (f == nullptr) {
       return IOError(fname, errno);
     }
-    result->reset(new PosixSequentialFile(fname, opts.encrypted, f));
+    result->reset(new PosixSequentialFile(fname, opts.is_sensitive && 
IsEncryptionEnabled(), f));
     return Status::OK();
   }
 
@@ -1321,7 +1325,8 @@ class PosixEnv : public Env {
       return IOError(fname, errno);
     }
 
-    result->reset(new PosixRandomAccessFile(fname, fd, opts.encrypted));
+    result->reset(new PosixRandomAccessFile(fname, fd,
+                  opts.is_sensitive && IsEncryptionEnabled()));
     return Status::OK();
   }
 
@@ -1363,7 +1368,8 @@ class PosixEnv : public Env {
     TRACE_EVENT1("io", "PosixEnv::NewRWFile", "path", fname);
     int fd;
     RETURN_NOT_OK(DoOpen(fname, opts.mode, &fd));
-    result->reset(new PosixRWFile(fname, fd, opts.sync_on_close, 
opts.encrypted));
+    result->reset(new PosixRWFile(fname, fd, opts.sync_on_close,
+                                  opts.is_sensitive && IsEncryptionEnabled()));
     return Status::OK();
   }
 
@@ -1372,7 +1378,8 @@ class PosixEnv : public Env {
     TRACE_EVENT1("io", "PosixEnv::NewTempRWFile", "template", name_template);
     int fd = 0;
     RETURN_NOT_OK(MkTmpFile(name_template, &fd, created_filename));
-    res->reset(new PosixRWFile(*created_filename, fd, opts.sync_on_close, 
opts.encrypted));
+    res->reset(new PosixRWFile(*created_filename, fd, opts.sync_on_close,
+                               opts.is_sensitive && IsEncryptionEnabled()));
     return Status::OK();
   }
 
@@ -1988,6 +1995,8 @@ class PosixEnv : public Env {
     return result;
   }
 
+  const bool IsEncryptionEnabled() override { return 
FLAGS_encrypt_data_at_rest; }
+
  private:
   // unique_ptr Deleter implementation for fts_close
   struct FtsCloser {
@@ -2032,7 +2041,8 @@ class PosixEnv : public Env {
     if (opts.mode == MUST_EXIST) {
       RETURN_NOT_OK(GetFileSize(fname, &file_size));
     }
-    result->reset(new PosixWritableFile(fname, fd, file_size, 
opts.sync_on_close, opts.encrypted));
+    result->reset(new PosixWritableFile(fname, fd, file_size, 
opts.sync_on_close,
+                                        opts.is_sensitive && 
IsEncryptionEnabled()));
     return Status::OK();
   }
 
diff --git a/src/kudu/util/env_util.cc b/src/kudu/util/env_util.cc
index 41f0ad1..45a9ddd 100644
--- a/src/kudu/util/env_util.cc
+++ b/src/kudu/util/env_util.cc
@@ -117,6 +117,8 @@ namespace env_util {
 
 Status OpenFileForWrite(Env* env, const string& path,
                         shared_ptr<WritableFile>* file) {
+  WritableFileOptions opts;
+  opts.is_sensitive = true;
   return OpenFileForWrite(WritableFileOptions(), env, path, file);
 }
 
@@ -132,7 +134,9 @@ Status OpenFileForWrite(const WritableFileOptions& opts,
 Status OpenFileForRandom(Env *env, const string &path,
                          shared_ptr<RandomAccessFile> *file) {
   unique_ptr<RandomAccessFile> r;
-  RETURN_NOT_OK(env->NewRandomAccessFile(path, &r));
+  RandomAccessFileOptions opts;
+  opts.is_sensitive = true;
+  RETURN_NOT_OK(env->NewRandomAccessFile(opts, path, &r));
   file->reset(r.release());
   return Status::OK();
 }
@@ -140,7 +144,9 @@ Status OpenFileForRandom(Env *env, const string &path,
 Status OpenFileForSequential(Env *env, const string &path,
                              shared_ptr<SequentialFile> *file) {
   unique_ptr<SequentialFile> r;
-  RETURN_NOT_OK(env->NewSequentialFile(path, &r));
+  SequentialFileOptions opts;
+  opts.is_sensitive = true;
+  RETURN_NOT_OK(env->NewSequentialFile(opts, path, &r));
   file->reset(r.release());
   return Status::OK();
 }
diff --git a/src/kudu/util/file_cache-test.cc b/src/kudu/util/file_cache-test.cc
index 59c3976..70e0ef1 100644
--- a/src/kudu/util/file_cache-test.cc
+++ b/src/kudu/util/file_cache-test.cc
@@ -44,6 +44,7 @@
 
 DECLARE_bool(cache_force_single_shard);
 DECLARE_int32(file_cache_expiry_period_ms);
+DECLARE_bool(encrypt_data_at_rest);
 
 using std::shared_ptr;
 using std::string;
@@ -51,6 +52,12 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
+namespace {
+  void SetEncryptionFlags(bool encryption_enabled) {
+    FLAGS_encrypt_data_at_rest = encryption_enabled;
+  }
+} // namespace
+
 namespace kudu {
 
 template <class FileType>
@@ -94,7 +101,9 @@ class FileCacheTest : public KuduTest {
 
   Status WriteTestFile(const string& name, const string& data) {
     unique_ptr<RWFile> f;
-    RETURN_NOT_OK(env_->NewRWFile(name, &f));
+    RWFileOptions opts;
+    opts.is_sensitive = true;
+    RETURN_NOT_OK(env_->NewRWFile(opts, name, &f));
     RETURN_NOT_OK(f->Write(0, data));
     return Status::OK();
   }
@@ -338,10 +347,15 @@ TYPED_TEST(FileCacheTest, TestNoRecursiveDeadlock) {
   }
 }
 
-class RandomAccessFileCacheTest : public FileCacheTest<RandomAccessFile> {
+class RandomAccessFileCacheTest :
+  public FileCacheTest<RandomAccessFile>,
+  public ::testing::WithParamInterface<bool> {
 };
 
-TEST_F(RandomAccessFileCacheTest, TestMemoryFootprintDoesNotCrash) {
+INSTANTIATE_TEST_SUITE_P(, RandomAccessFileCacheTest, ::testing::Values(false, 
true));
+
+TEST_P(RandomAccessFileCacheTest, TestMemoryFootprintDoesNotCrash) {
+  SetEncryptionFlags(GetParam());
   const string kFile = this->GetTestPath("foo");
   ASSERT_OK(this->WriteTestFile(kFile, "test data"));
 
@@ -353,10 +367,15 @@ TEST_F(RandomAccessFileCacheTest, 
TestMemoryFootprintDoesNotCrash) {
   LOG(INFO) << f->memory_footprint();
 }
 
-class RWFileCacheTest : public FileCacheTest<RWFile> {
+class RWFileCacheTest :
+  public FileCacheTest<RWFile>,
+  public ::testing::WithParamInterface<bool> {
 };
 
-TEST_F(RWFileCacheTest, TestOpenMustCreate) {
+INSTANTIATE_TEST_SUITE_P(, RWFileCacheTest, ::testing::Values(false, true));
+
+TEST_P(RWFileCacheTest, TestOpenMustCreate) {
+  SetEncryptionFlags(GetParam());
   const string kFile1 = this->GetTestPath("foo");
   const string kFile2 = this->GetTestPath("bar");
 
@@ -385,7 +404,8 @@ TEST_F(RWFileCacheTest, TestOpenMustCreate) {
   }
 }
 
-TEST_F(RWFileCacheTest, TestOpenCreateOrOpen) {
+TEST_P(RWFileCacheTest, TestOpenCreateOrOpen) {
+  SetEncryptionFlags(GetParam());
   const string kFile1 = this->GetTestPath("foo");
   const string kFile2 = this->GetTestPath("bar");
 
@@ -407,10 +427,15 @@ TEST_F(RWFileCacheTest, TestOpenCreateOrOpen) {
   ASSERT_TRUE(rwf1->Sync().IsNotFound());
 }
 
-class MixedFileCacheTest : public KuduTest {
+class MixedFileCacheTest :
+  public KuduTest,
+  public ::testing::WithParamInterface<bool> {
 };
 
-TEST_F(MixedFileCacheTest, TestBothFileTypes) {
+INSTANTIATE_TEST_SUITE_P(, MixedFileCacheTest, ::testing::Values(false, true));
+
+TEST_P(MixedFileCacheTest, TestBothFileTypes) {
+  SetEncryptionFlags(GetParam());
   const string kFile1 = GetTestPath("foo");
   const string kData1 = "test data 1";
   const string kFile2 = GetTestPath("foo2");
@@ -419,9 +444,11 @@ TEST_F(MixedFileCacheTest, TestBothFileTypes) {
   // Create the two test files.
   {
     unique_ptr<RWFile> f;
-    ASSERT_OK(env_->NewRWFile(kFile1, &f));
+    RWFileOptions opts;
+    opts.is_sensitive = true;
+    ASSERT_OK(env_->NewRWFile(opts, kFile1, &f));
     ASSERT_OK(f->Write(0, kData1));
-    ASSERT_OK(env_->NewRWFile(kFile2, &f));
+    ASSERT_OK(env_->NewRWFile(opts, kFile2, &f));
     ASSERT_OK(f->Write(0, kData2));
   }
 
diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc
index 6c02467..7be762c 100644
--- a/src/kudu/util/file_cache.cc
+++ b/src/kudu/util/file_cache.cc
@@ -342,6 +342,7 @@ class Descriptor<RWFile> : public RWFile {
     // The file was evicted, reopen it.
     RWFileOptions opts;
     opts.mode = Mode;
+    opts.is_sensitive = true;
     unique_ptr<RWFile> f;
     RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));
 
@@ -440,7 +441,9 @@ class Descriptor<RandomAccessFile> : public 
RandomAccessFile {
 
     // The file was evicted, reopen it.
     unique_ptr<RandomAccessFile> f;
-    RETURN_NOT_OK(base_.env()->NewRandomAccessFile(base_.filename(), &f));
+    RandomAccessFileOptions opts;
+    opts.is_sensitive = true;
+    RETURN_NOT_OK(base_.env()->NewRandomAccessFile(opts, base_.filename(), 
&f));
 
     // The cache will take ownership of the newly opened file.
     ScopedOpenedDescriptor<RandomAccessFile> opened(
diff --git a/src/kudu/util/pb_util-test.cc b/src/kudu/util/pb_util-test.cc
index c4d8ba3..63f3a08 100644
--- a/src/kudu/util/pb_util-test.cc
+++ b/src/kudu/util/pb_util-test.cc
@@ -72,7 +72,8 @@ class TestPBUtil : public KuduTest {
   // Since this is a unit test class, and we want it to be fast, we do not
   // fsync by default.
   Status CreateKnownGoodContainerFile(CreateMode create = OVERWRITE,
-                                      SyncMode sync = NO_SYNC);
+                                      SyncMode sync = NO_SYNC,
+                                      SensitivityMode sensitivity = 
NOT_SENSITIVE);
 
   // Create a new Protobuf Container File Writer.
   // Set version to kUseDefaultVersion to use the default version.
@@ -83,7 +84,8 @@ class TestPBUtil : public KuduTest {
   // Set version to kUseDefaultVersion to use the default version.
   Status CreateKnownGoodContainerFileWithVersion(int version,
                                                  CreateMode create = OVERWRITE,
-                                                 SyncMode sync = NO_SYNC);
+                                                 SyncMode sync = NO_SYNC,
+                                                 SensitivityMode sensitivity = 
NOT_SENSITIVE);
 
   // XORs the data in the specified range of the file at the given path.
   Status BitFlipFileByteRange(const string& path, uint64_t offset, uint64_t 
length);
@@ -113,11 +115,13 @@ class TestPBContainerVersions : public TestPBUtil,
 INSTANTIATE_TEST_SUITE_P(SupportedVersions, TestPBContainerVersions,
                          ::testing::Values(1, 2, kUseDefaultVersion));
 
-Status TestPBUtil::CreateKnownGoodContainerFile(CreateMode create, SyncMode 
sync) {
+Status TestPBUtil::CreateKnownGoodContainerFile(CreateMode create,
+                                                SyncMode sync,
+                                                SensitivityMode sensitivity) {
   ProtoContainerTestPB test_pb;
   test_pb.set_name(kTestKeyvalName);
   test_pb.set_value(kTestKeyvalValue);
-  return WritePBContainerToPath(env_, path_, test_pb, create, sync);
+  return WritePBContainerToPath(env_, path_, test_pb, create, sync, 
sensitivity);
 }
 
 Status TestPBUtil::NewPBCWriter(int version, RWFileOptions opts,
@@ -133,13 +137,16 @@ Status TestPBUtil::NewPBCWriter(int version, 
RWFileOptions opts,
 
 Status TestPBUtil::CreateKnownGoodContainerFileWithVersion(int version,
                                                            CreateMode create,
-                                                           SyncMode sync) {
+                                                           SyncMode sync,
+                                                           SensitivityMode 
sensitivity) {
   ProtoContainerTestPB test_pb;
   test_pb.set_name(kTestKeyvalName);
   test_pb.set_value(kTestKeyvalValue);
 
   unique_ptr<WritablePBContainerFile> pb_writer;
-  RETURN_NOT_OK(NewPBCWriter(version, RWFileOptions(), &pb_writer));
+  RWFileOptions opts;
+  opts.is_sensitive = sensitivity == SENSITIVE;
+  RETURN_NOT_OK(NewPBCWriter(version, opts, &pb_writer));
   RETURN_NOT_OK(pb_writer->CreateNew(test_pb));
   RETURN_NOT_OK(pb_writer->Append(test_pb));
   RETURN_NOT_OK(pb_writer->Close());
@@ -232,22 +239,25 @@ TEST_F(TestPBUtil, TestWritableFileOutputStream) {
 
 // Basic read/write test.
 TEST_F(TestPBUtil, TestPBContainerSimple) {
-  // Exercise both the SYNC and NO_SYNC codepaths, despite the fact that we
-  // aren't able to observe a difference in the test.
-  vector<SyncMode> modes = { SYNC, NO_SYNC };
-  for (SyncMode mode : modes) {
-
-    // Write the file.
-    ASSERT_OK(CreateKnownGoodContainerFile(NO_OVERWRITE, mode));
-
-    // Read it back, should validate and contain the expected values.
-    ProtoContainerTestPB test_pb;
-    ASSERT_OK(ReadPBContainerFromPath(env_, path_, &test_pb));
-    ASSERT_EQ(kTestKeyvalName, test_pb.name());
-    ASSERT_EQ(kTestKeyvalValue, test_pb.value());
-
-    // Delete the file.
-    ASSERT_OK(env_->DeleteFile(path_));
+  // Exercise both the SYNC and NO_SYNC codepaths, along with SENSITIVE and
+  // NOT_SENSITIVE, despite the fact that we aren't able to observe a 
difference
+  // in the test.
+  vector<SyncMode> sync_modes = { SYNC, NO_SYNC };
+  vector<SensitivityMode> sensitivity_modes = { SENSITIVE, NOT_SENSITIVE };
+  for (SyncMode sync_mode : sync_modes) {
+    for (SensitivityMode sensitivity_mode : sensitivity_modes) {
+      // Write the file.
+      ASSERT_OK(CreateKnownGoodContainerFile(NO_OVERWRITE, sync_mode, 
sensitivity_mode));
+
+      // Read it back, should validate and contain the expected values.
+      ProtoContainerTestPB test_pb;
+      ASSERT_OK(ReadPBContainerFromPath(env_, path_, &test_pb, 
sensitivity_mode));
+      ASSERT_EQ(kTestKeyvalName, test_pb.name());
+      ASSERT_EQ(kTestKeyvalValue, test_pb.value());
+
+      // Delete the file.
+      ASSERT_OK(env_->DeleteFile(path_));
+    }
   }
 }
 
@@ -255,7 +265,7 @@ TEST_F(TestPBUtil, TestPBContainerSimple) {
 TEST_P(TestPBContainerVersions, TestCorruption) {
   // Test that we indicate when the file does not exist.
   ProtoContainerTestPB test_pb;
-  Status s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  Status s = ReadPBContainerFromPath(env_, path_, &test_pb, NOT_SENSITIVE);
   ASSERT_TRUE(s.IsNotFound()) << "Should not be found: " << path_ << ": " << 
s.ToString();
 
   // Test that an empty file looks like corruption.
@@ -265,7 +275,7 @@ TEST_P(TestPBContainerVersions, TestCorruption) {
     ASSERT_OK(env_->NewWritableFile(path_, &file));
     ASSERT_OK(file->Close());
   }
-  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  s = ReadPBContainerFromPath(env_, path_, &test_pb, NOT_SENSITIVE);
   ASSERT_TRUE(s.IsIncomplete()) << "Should be zero length: " << path_ << ": " 
<< s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
 
@@ -274,7 +284,7 @@ TEST_P(TestPBContainerVersions, TestCorruption) {
   uint64_t known_good_size = 0;
   ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
   ASSERT_OK(TruncateFile(path_, known_good_size - 2));
-  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  s = ReadPBContainerFromPath(env_, path_, &test_pb, NOT_SENSITIVE);
   if (version_ == 1) {
     ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect size: " << path_ << 
": " << s.ToString();
   } else {
@@ -285,14 +295,14 @@ TEST_P(TestPBContainerVersions, TestCorruption) {
   // Test corrupted magic.
   ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
   ASSERT_OK(BitFlipFileByteRange(path_, 0, 2));
-  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  s = ReadPBContainerFromPath(env_, path_, &test_pb, NOT_SENSITIVE);
   ASSERT_TRUE(s.IsCorruption()) << "Should have invalid magic: " << path_ << 
": " << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Invalid magic number");
 
   // Test corrupted version.
   ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
   ASSERT_OK(BitFlipFileByteRange(path_, 8, 2));
-  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  s = ReadPBContainerFromPath(env_, path_, &test_pb, NOT_SENSITIVE);
   ASSERT_TRUE(s.IsNotSupported()) << "Should have unsupported version number: 
" << path_ << ": "
                                   << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), " Protobuf container has unsupported 
version");
@@ -301,7 +311,7 @@ TEST_P(TestPBContainerVersions, TestCorruption) {
   if (version_ >= 2) {
     ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
     ASSERT_OK(BitFlipFileByteRange(path_, 12, 2));
-    s = ReadPBContainerFromPath(env_, path_, &test_pb);
+    s = ReadPBContainerFromPath(env_, path_, &test_pb, NOT_SENSITIVE);
     ASSERT_TRUE(s.IsCorruption()) << "Should have corrupted file header 
checksum: " << path_ << ": "
                                     << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "File header checksum does not match");
@@ -313,7 +323,7 @@ TEST_P(TestPBContainerVersions, TestCorruption) {
   // Test corrupted data length.
   ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
   ASSERT_OK(BitFlipFileByteRange(path_, kFirstRecordOffset, 2));
-  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  s = ReadPBContainerFromPath(env_, path_, &test_pb, NOT_SENSITIVE);
   if (version_ == 1) {
     ASSERT_TRUE(s.IsCorruption()) << s.ToString();
     ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be 
valid");
@@ -326,7 +336,7 @@ TEST_P(TestPBContainerVersions, TestCorruption) {
   // Test corrupted data (looks like bad checksum).
   ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
   ASSERT_OK(BitFlipFileByteRange(path_, kFirstRecordOffset + 4, 2));
-  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  s = ReadPBContainerFromPath(env_, path_, &test_pb, NOT_SENSITIVE);
   ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect checksum: " << path_ 
<< ": "
                                 << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
@@ -334,7 +344,7 @@ TEST_P(TestPBContainerVersions, TestCorruption) {
   // Test corrupted checksum.
   ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
   ASSERT_OK(BitFlipFileByteRange(path_, known_good_size - 4, 2));
-  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  s = ReadPBContainerFromPath(env_, path_, &test_pb, NOT_SENSITIVE);
   ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect checksum: " << path_ 
<< ": "
                                 << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
@@ -348,7 +358,9 @@ TEST_P(TestPBContainerVersions, TestPartialRecord) {
   ASSERT_OK(TruncateFile(path_, known_good_size - 2));
 
   unique_ptr<RandomAccessFile> file;
-  ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+  RandomAccessFileOptions opts;
+  opts.is_sensitive = false;
+  ASSERT_OK(env_->NewRandomAccessFile(opts, path_, &file));
   ReadablePBContainerFile pb_file(std::move(file));
   ASSERT_OK(pb_file.Open());
   ProtoContainerTestPB test_pb;
@@ -377,7 +389,9 @@ TEST_P(TestPBContainerVersions, TestExtraNullBytes) {
     ASSERT_OK(TruncateFile(path_, known_good_size + extra_bytes));
 
     unique_ptr<RandomAccessFile> file;
-    ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+    RandomAccessFileOptions opts;
+    opts.is_sensitive = false;
+    ASSERT_OK(env_->NewRandomAccessFile(opts, path_, &file));
     ReadablePBContainerFile pb_file(std::move(file));
     ASSERT_OK(pb_file.Open());
     ProtoContainerTestPB test_pb;
@@ -409,13 +423,16 @@ TEST_P(TestPBContainerVersions, 
TestAppendAfterPartialWrite) {
   unique_ptr<WritablePBContainerFile> writer;
   RWFileOptions opts;
   opts.mode = Env::MUST_EXIST;
+  opts.is_sensitive = false;
   ASSERT_OK(NewPBCWriter(version_, opts, &writer));
   ASSERT_OK(writer->OpenExisting());
 
   ASSERT_OK(TruncateFile(path_, known_good_size - 2));
 
   unique_ptr<RandomAccessFile> file;
-  ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+  RandomAccessFileOptions random_opts;
+  random_opts.is_sensitive = false;
+  ASSERT_OK(env_->NewRandomAccessFile(random_opts, path_, &file));
   ReadablePBContainerFile reader(std::move(file));
   ASSERT_OK(reader.Open());
   ProtoContainerTestPB test_pb;
@@ -450,7 +467,7 @@ TEST_P(TestPBContainerVersions, 
TestAppendAfterPartialWrite) {
 TEST_P(TestPBContainerVersions, TestSingleMessage) {
   ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
   ProtoContainerTestPB test_pb;
-  ASSERT_OK(ReadPBContainerFromPath(env_, path_, &test_pb));
+  ASSERT_OK(ReadPBContainerFromPath(env_, path_, &test_pb, NOT_SENSITIVE));
   ASSERT_EQ(kTestKeyvalName, test_pb.name());
   ASSERT_EQ(kTestKeyvalValue, test_pb.value());
 }
diff --git a/src/kudu/util/pb_util.cc b/src/kudu/util/pb_util.cc
index cfdff3d..f049508 100644
--- a/src/kudu/util/pb_util.cc
+++ b/src/kudu/util/pb_util.cc
@@ -62,7 +62,6 @@
 #include "kudu/util/debug/sanitizer_scopes.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/env.h"
-#include "kudu/util/env_util.h"
 #include "kudu/util/faststring.h"
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/logging.h"
@@ -516,7 +515,9 @@ Status WritePBToPath(Env* env, const std::string& path,
   string tmp_path;
 
   unique_ptr<WritableFile> file;
-  RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, 
&tmp_path, &file));
+  WritableFileOptions opts;
+  opts.is_sensitive = false;
+  RETURN_NOT_OK(env->NewTempWritableFile(opts, tmp_template, &tmp_path, 
&file));
   auto tmp_deleter = MakeScopedCleanup([&]() {
     WARN_NOT_OK(env->DeleteFile(tmp_path), "Could not delete file " + 
tmp_path);
   });
@@ -540,8 +541,10 @@ Status WritePBToPath(Env* env, const std::string& path,
 }
 
 Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) {
-  shared_ptr<SequentialFile> rfile;
-  RETURN_NOT_OK(env_util::OpenFileForSequential(env, path, &rfile));
+  unique_ptr<SequentialFile> rfile;
+  SequentialFileOptions opts;
+  opts.is_sensitive = false;
+  RETURN_NOT_OK(env->NewSequentialFile(opts, path, &rfile));
   RETURN_NOT_OK(ParseFromSequentialFile(msg, rfile.get()));
   return Status::OK();
 }
@@ -1042,9 +1045,12 @@ uint64_t ReadablePBContainerFile::offset() const {
   return offset_;
 }
 
-Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* 
msg) {
+Status ReadPBContainerFromPath(Env* env, const std::string& path,
+                               Message* msg, SensitivityMode sensitivity_mode) 
{
   unique_ptr<RandomAccessFile> file;
-  RETURN_NOT_OK(env->NewRandomAccessFile(path, &file));
+  RandomAccessFileOptions opts;
+  opts.is_sensitive = sensitivity_mode == SENSITIVE;
+  RETURN_NOT_OK(env->NewRandomAccessFile(opts, path, &file));
 
   ReadablePBContainerFile pb_file(std::move(file));
   RETURN_NOT_OK(pb_file.Open());
@@ -1055,7 +1061,8 @@ Status ReadPBContainerFromPath(Env* env, const 
std::string& path, Message* msg)
 Status WritePBContainerToPath(Env* env, const std::string& path,
                               const Message& msg,
                               CreateMode create,
-                              SyncMode sync) {
+                              SyncMode sync,
+                              SensitivityMode sensitivity_mode) {
   TRACE_EVENT2("io", "WritePBContainerToPath",
                "path", path,
                "msg_type", msg.GetTypeName());
@@ -1068,7 +1075,9 @@ Status WritePBContainerToPath(Env* env, const 
std::string& path,
   string tmp_path;
 
   unique_ptr<RWFile> file;
-  RETURN_NOT_OK(env->NewTempRWFile(RWFileOptions(), tmp_template, &tmp_path, 
&file));
+  RWFileOptions opts;
+  opts.is_sensitive = sensitivity_mode == SENSITIVE;
+  RETURN_NOT_OK(env->NewTempRWFile(opts, tmp_template, &tmp_path, &file));
   auto tmp_deleter = MakeScopedCleanup([&]() {
     WARN_NOT_OK(env->DeleteFile(tmp_path), "Could not delete file " + 
tmp_path);
   });
diff --git a/src/kudu/util/pb_util.h b/src/kudu/util/pb_util.h
index 0ad4cba..732a34e 100644
--- a/src/kudu/util/pb_util.h
+++ b/src/kudu/util/pb_util.h
@@ -66,6 +66,11 @@ enum CreateMode {
   NO_OVERWRITE
 };
 
+enum SensitivityMode {
+  SENSITIVE,
+  NOT_SENSITIVE
+};
+
 enum class FileState {
   NOT_INITIALIZED,
   OPEN,
@@ -469,7 +474,8 @@ class ReadablePBContainerFile {
 // If the file does not exist, returns Status::NotFound(). Otherwise, may
 // return other Status error codes such as Status::IOError.
 Status ReadPBContainerFromPath(Env* env, const std::string& path,
-                               google::protobuf::Message* msg);
+                               google::protobuf::Message* msg,
+                               SensitivityMode sensitivity_mode);
 
 // Serialize a "containerized" protobuf to the given path.
 //
@@ -478,7 +484,8 @@ Status ReadPBContainerFromPath(Env* env, const std::string& 
path,
 Status WritePBContainerToPath(Env* env, const std::string& path,
                               const google::protobuf::Message& msg,
                               CreateMode create,
-                              SyncMode sync);
+                              SyncMode sync,
+                              SensitivityMode sensitivity_mode);
 
 // Wrapper for a protobuf message which lazily converts to JSON when
 // the trace buffer is dumped.
diff --git a/src/kudu/util/rolling_log.cc b/src/kudu/util/rolling_log.cc
index 9f35e0b..74dd1a8 100644
--- a/src/kudu/util/rolling_log.cc
+++ b/src/kudu/util/rolling_log.cc
@@ -160,6 +160,7 @@ Status RollingLog::Open() {
     // Logs aren't worth the performance cost of durability.
     opts.sync_on_close = false;
     opts.mode = Env::MUST_CREATE;
+    opts.is_sensitive = false;
 
     RETURN_NOT_OK(env_->NewWritableFile(opts, path, &file_));
 
@@ -248,7 +249,9 @@ class ScopedGzipCloser {
 // and is less likely to be problematic.
 Status RollingLog::CompressFile(const std::string& path) const {
   unique_ptr<SequentialFile> in_file;
-  RETURN_NOT_OK_PREPEND(env_->NewSequentialFile(path, &in_file),
+  SequentialFileOptions opts;
+  opts.is_sensitive = false;
+  RETURN_NOT_OK_PREPEND(env_->NewSequentialFile(opts, path, &in_file),
                         "Unable to open input file to compress");
 
   string gz_path = path + ".gz";
diff --git a/src/kudu/util/yamlreader-test.cc b/src/kudu/util/yamlreader-test.cc
index bfae8c3..9a302ff 100644
--- a/src/kudu/util/yamlreader-test.cc
+++ b/src/kudu/util/yamlreader-test.cc
@@ -43,9 +43,7 @@ public:
   Status GenerateYamlReader(const string& content, unique_ptr<YamlReader>* 
result) {
     string fname = GetTestPath("YamlReaderTest.json");
     unique_ptr<WritableFile> writable_file;
-    RETURN_NOT_OK(env_->NewWritableFile(fname, &writable_file));
-    RETURN_NOT_OK(writable_file->Append(Slice(content)));
-    RETURN_NOT_OK(writable_file->Close());
+    RETURN_NOT_OK(WriteStringToFile(env_, content, fname));
     result->reset(new YamlReader(fname));
     return Status::OK();
   }

Reply via email to