This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b67c0906f596ca336d0ea0e8cbc618a20ac0e563
Author: Thomas Tauber-Marshall <tmarsh...@cloudera.com>
AuthorDate: Tue Mar 24 14:04:53 2020 -0700

    IMPALA-9692 (part 2): Refactor parts of TExecPlanFragmentInfo to protobuf
    
    The new admission control service will be written in protobuf, so
    there are various admission control related structures currently
    stored in Thrift that it would be convenient to convert to protobuf,
    to minimize the amount of converting back and forth that needs to be
    done.
    
    This patch converts some portions of TExecPlanFragmentInfo to
    protobuf. TExecPlanFragmentInfo is sent as a sidecar with the Exec()
    rpc, so the refactored parts are now just directly included in the
    ExecQueryFInstancesRequestPB.
    
    The portions that are converted are those that are part of the
    QuerySchedule, in particular the TPlanFragmentDestination,
    TScanRangeParams, and TJoinBuildInput.
    
    This patch is just a refactor and doesn't contain any functional
    changes.
    
    One notable related change is that DataSink::CreateSink() has two
    parameters removed - TPlanFragmentCtx (which no longer exists) and
    TPlanFragmentInstanceCtx. These variables and the new PB eqivalents
    are available via the RuntimeState that was already being passed in as
    another parameter and don't need to be individually passed in.
    
    Testing:
    - Passed a full run of existing tests.
    - Ran the single node perf test and didn't detect any regressions.
    
    Change-Id: I3a8e46767b257bbf677171ac2f4efb1b623ba41b
    Reviewed-on: http://gerrit.cloudera.org:8080/15844
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/benchmarks/expr-benchmark.cc                |  7 +-
 be/src/benchmarks/hash-benchmark.cc                |  5 +-
 be/src/codegen/llvm-codegen-test.cc                |  6 +-
 be/src/exec/blocking-join-node.cc                  | 16 +++--
 be/src/exec/data-sink.h                            |  3 +-
 be/src/exec/hbase-scan-node.cc                     | 16 ++---
 be/src/exec/hbase-table-sink.cc                    |  5 +-
 be/src/exec/hbase-table-sink.h                     |  4 +-
 be/src/exec/hdfs-scan-node-base.cc                 | 66 +++++++++---------
 be/src/exec/hdfs-scan-node-base.h                  |  2 +-
 be/src/exec/hdfs-table-sink.cc                     |  5 +-
 be/src/exec/hdfs-table-sink.h                      |  4 +-
 be/src/exec/kudu-scan-node-base.cc                 |  9 +--
 be/src/exec/kudu-table-sink.cc                     |  5 +-
 be/src/exec/kudu-table-sink.h                      |  4 +-
 be/src/exec/nested-loop-join-builder.cc            |  6 +-
 be/src/exec/nested-loop-join-builder.h             |  4 +-
 be/src/exec/partitioned-hash-join-builder.cc       |  5 +-
 be/src/exec/partitioned-hash-join-builder.h        |  4 +-
 be/src/exec/plan-root-sink.cc                      |  7 +-
 be/src/exec/plan-root-sink.h                       |  4 +-
 be/src/exec/scan-node.h                            |  7 +-
 be/src/exprs/expr-codegen-test.cc                  |  6 +-
 be/src/rpc/CMakeLists.txt                          |  2 +
 be/src/runtime/coordinator-backend-state.cc        | 35 ++++++----
 be/src/runtime/data-stream-test.cc                 | 40 +++++------
 be/src/runtime/fragment-instance-state.cc          | 34 +++++----
 be/src/runtime/fragment-instance-state.h           | 13 ++--
 be/src/runtime/fragment-state.cc                   | 52 ++++++++------
 be/src/runtime/fragment-state.h                    | 30 +++++---
 be/src/runtime/krpc-data-stream-sender.cc          | 60 ++++++++--------
 be/src/runtime/krpc-data-stream-sender.h           |  8 +--
 be/src/runtime/query-state.cc                      | 37 ++++++----
 be/src/runtime/row-batch.cc                        | 10 +--
 be/src/runtime/runtime-state.cc                    | 24 ++++---
 be/src/runtime/runtime-state.h                     | 18 +++--
 be/src/runtime/test-env.cc                         | 16 +++--
 be/src/scheduling/query-schedule.h                 |  7 +-
 be/src/scheduling/scheduler-test-util.cc           | 20 +++---
 be/src/scheduling/scheduler-test-util.h            |  3 +-
 be/src/scheduling/scheduler-test.cc                | 34 ++++-----
 be/src/scheduling/scheduler.cc                     | 80 +++++++++++++++-------
 be/src/scheduling/scheduler.h                      |  6 +-
 be/src/service/fe-support.cc                       |  5 +-
 be/src/util/CMakeLists.txt                         |  1 +
 be/src/util/compression-util.cc                    | 64 +++++++++++++++++
 .../src/util/compression-util.h                    | 28 +++-----
 be/src/util/container-util.h                       |  8 +++
 be/src/util/uid-util.h                             |  6 ++
 common/protobuf/CMakeLists.txt                     |  2 +-
 common/protobuf/common.proto                       | 21 ++++--
 common/protobuf/control_service.proto              | 73 ++++++++++++++++++++
 common/protobuf/planner.proto                      | 76 ++++++++++++++++++++
 common/protobuf/row_batch.proto                    |  2 +-
 common/thrift/ImpalaInternalService.thrift         | 58 ++--------------
 common/thrift/PlanNodes.thrift                     |  8 ++-
 56 files changed, 677 insertions(+), 404 deletions(-)

diff --git a/be/src/benchmarks/expr-benchmark.cc 
b/be/src/benchmarks/expr-benchmark.cc
index 028f137..be42114 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -84,10 +84,11 @@ class Planner {
     TNetworkAddress dummy;
     ImpalaServer::PrepareQueryContext(dummy, dummy, &query_ctx);
     runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_));
-    TPlanFragmentCtx* fragment_ctx =
-        runtime_state_->obj_pool()->Add(new TPlanFragmentCtx());
+    TPlanFragment* fragment = runtime_state_->obj_pool()->Add(new 
TPlanFragment());
+    PlanFragmentCtxPB* fragment_ctx =
+        runtime_state_->obj_pool()->Add(new PlanFragmentCtxPB());
     fragment_state_ = runtime_state_->obj_pool()->Add(
-        new FragmentState(runtime_state_->query_state(), *fragment_ctx));
+        new FragmentState(runtime_state_->query_state(), *fragment, 
*fragment_ctx));
 
     return frontend_.GetExecRequest(query_ctx, result);
   }
diff --git a/be/src/benchmarks/hash-benchmark.cc 
b/be/src/benchmarks/hash-benchmark.cc
index d8e7d5e..fc28866 100644
--- a/be/src/benchmarks/hash-benchmark.cc
+++ b/be/src/benchmarks/hash-benchmark.cc
@@ -482,9 +482,10 @@ int main(int argc, char **argv) {
   }
   status = test_env.CreateQueryState(0, nullptr, &state);
   QueryState* qs = state->query_state();
-  TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new TPlanFragmentCtx());
+  TPlanFragment* fragment = qs->obj_pool()->Add(new TPlanFragment());
+  PlanFragmentCtxPB* fragment_ctx = qs->obj_pool()->Add(new 
PlanFragmentCtxPB());
   FragmentState* fragment_state =
-      qs->obj_pool()->Add(new FragmentState(qs, *fragment_ctx));
+      qs->obj_pool()->Add(new FragmentState(qs, *fragment, *fragment_ctx));
   if (!status.ok()) {
     cout << "Could not create RuntimeState";
     return -1;
diff --git a/be/src/codegen/llvm-codegen-test.cc 
b/be/src/codegen/llvm-codegen-test.cc
index 36b9bad..47f4254 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -52,8 +52,10 @@ class LlvmCodeGenTest : public testing:: Test {
     RuntimeState* runtime_state_;
     ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
     QueryState* qs = runtime_state_->query_state();
-    TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new 
TPlanFragmentCtx());
-    fragment_state_ = qs->obj_pool()->Add(new FragmentState(qs, 
*fragment_ctx));
+    TPlanFragment* fragment = qs->obj_pool()->Add(new TPlanFragment());
+    PlanFragmentCtxPB* fragment_ctx = qs->obj_pool()->Add(new 
PlanFragmentCtxPB());
+    fragment_state_ =
+        qs->obj_pool()->Add(new FragmentState(qs, *fragment, *fragment_ctx));
   }
 
   virtual void TearDown() {
diff --git a/be/src/exec/blocking-join-node.cc 
b/be/src/exec/blocking-join-node.cc
index 320e5d4..da7c9a5 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -34,6 +34,7 @@
 #include "util/runtime-profile-counters.h"
 #include "util/thread.h"
 #include "util/time.h"
+#include "util/uid-util.h"
 
 #include "gen-cpp/PlanNodes_types.h"
 
@@ -189,16 +190,17 @@ Status BlockingJoinNode::OpenImpl(RuntimeState* state, 
JoinBuilder** separate_bu
   if (UseSeparateBuild(state->query_options())) {
     // Find the input fragment's build sink. We do this in the Open() phase so 
we don't
     // block this finstance's Prepare() phase on the build finstance's 
Prepare() phase.
-    const vector<TJoinBuildInput>& build_inputs =
-        state->instance_ctx().join_build_inputs;
+    const google::protobuf::RepeatedPtrField<JoinBuildInputPB>& build_inputs =
+        state->instance_ctx_pb().join_build_inputs();
     auto it = std::find_if(build_inputs.begin(), build_inputs.end(),
-        [this](const TJoinBuildInput& bi) { return bi.join_node_id == id_; });
+        [this](const JoinBuildInputPB& bi) { return bi.join_node_id() == id_; 
});
     DCHECK(it != build_inputs.end());
     FragmentInstanceState* build_finstance;
-    RETURN_IF_ERROR(state->query_state()->GetFInstanceState(
-        it->input_finstance_id, &build_finstance));
-    TDataSinkType::type build_sink_type =
-        build_finstance->fragment_ctx().fragment.output_sink.type;
+    TUniqueId input_finstance_id;
+    UniqueIdPBToTUniqueId(it->input_finstance_id(), &input_finstance_id);
+    RETURN_IF_ERROR(
+        state->query_state()->GetFInstanceState(input_finstance_id, 
&build_finstance));
+    TDataSinkType::type build_sink_type = 
build_finstance->fragment().output_sink.type;
     DCHECK(IsJoinBuildSink(build_sink_type));
     *separate_builder = build_finstance->GetJoinBuildSink();
     DCHECK(*separate_builder != nullptr);
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 3dcb268..0bf4281 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -53,8 +53,7 @@ class DataSinkConfig {
   virtual ~DataSinkConfig() {}
 
   /// Create its corresponding DataSink. Place the sink in state->obj_pool().
-  virtual DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* 
state) const = 0;
+  virtual DataSink* CreateSink(RuntimeState* state) const = 0;
 
   /// Codegen expressions in the sink. Overridden by sink type which supports 
codegen.
   /// No-op by default.
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index b00929d..ef70fcd 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -104,19 +104,19 @@ Status HBaseScanNode::Prepare(RuntimeState* state) {
   // TODO(marcel): add int tuple_idx_[] indexed by TupleId somewhere in 
runtime-state.h
   tuple_idx_ = 0;
 
-  // Convert TScanRangeParams to ScanRanges
+  // Convert ScanRangeParamsPB to ScanRanges
   DCHECK(scan_range_params_ != NULL)
       << "Must call SetScanRanges() before calling Prepare()";
-  for (const TScanRangeParams& params: *scan_range_params_) {
-    DCHECK(params.scan_range.__isset.hbase_key_range);
-    const THBaseKeyRange& key_range = params.scan_range.hbase_key_range;
+  for (const ScanRangeParamsPB& params : *scan_range_params_) {
+    DCHECK(params.scan_range().has_hbase_key_range());
+    const HBaseKeyRangePB& key_range = params.scan_range().hbase_key_range();
     scan_range_vector_.push_back(HBaseTableScanner::ScanRange());
     HBaseTableScanner::ScanRange& sr = scan_range_vector_.back();
-    if (key_range.__isset.startKey) {
-      sr.set_start_key(key_range.startKey);
+    if (key_range.has_startkey()) {
+      sr.set_start_key(key_range.startkey());
     }
-    if (key_range.__isset.stopKey) {
-      sr.set_stop_key(key_range.stopKey);
+    if (key_range.has_stopkey()) {
+      sr.set_stop_key(key_range.stopkey());
     }
   }
   runtime_profile_->AddInfoString("Table Name", 
hbase_table->fully_qualified_name());
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index 129ba25..9619580 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -31,9 +31,8 @@
 
 namespace impala {
 
-DataSink* HBaseTableSinkConfig::CreateSink(const TPlanFragmentCtx& 
fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* 
state) const {
-  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+DataSink* HBaseTableSinkConfig::CreateSink(RuntimeState* state) const {
+  TDataSinkId sink_id = state->fragment().idx;
   return state->obj_pool()->Add(new HBaseTableSink(sink_id, *this, state));
 }
 
diff --git a/be/src/exec/hbase-table-sink.h b/be/src/exec/hbase-table-sink.h
index 4d702bd..60268d4 100644
--- a/be/src/exec/hbase-table-sink.h
+++ b/be/src/exec/hbase-table-sink.h
@@ -33,9 +33,7 @@ namespace impala {
 
 class HBaseTableSinkConfig : public DataSinkConfig {
  public:
-  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
-      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-      RuntimeState* state) const override;
+  DataSink* CreateSink(RuntimeState* state) const override;
 
   ~HBaseTableSinkConfig() override {}
 };
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index acf11b4..bef92fd 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -48,6 +48,7 @@
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
+#include "util/compression-util.h"
 #include "util/disk-info.h"
 #include "util/hdfs-util.h"
 #include "util/impalad-metrics.h"
@@ -223,14 +224,16 @@ Status HdfsScanPlanNode::Init(const TPlanNode& tnode, 
FragmentState* state) {
         *min_max_row_desc, state, &min_max_conjuncts_));
   }
 
-  const vector<const TPlanFragmentInstanceCtx*>& instance_ctxs = 
state->instance_ctxs();
+  const vector<const PlanFragmentInstanceCtxPB*>& instance_ctxs =
+      state->instance_ctx_pbs();
   for (auto ctx : instance_ctxs) {
-    auto ranges = ctx->per_node_scan_ranges.find(tnode.node_id);
-    if (ranges == ctx->per_node_scan_ranges.end()) continue;
-    for (const TScanRangeParams& scan_range_param : ranges->second) {
-      const THdfsFileSplit& split = 
scan_range_param.scan_range.hdfs_file_split;
+    auto ranges = ctx->per_node_scan_ranges().find(tnode.node_id);
+    if (ranges == ctx->per_node_scan_ranges().end()) continue;
+    for (const ScanRangeParamsPB& scan_range_param : 
ranges->second.scan_ranges()) {
+      DCHECK(scan_range_param.scan_range().has_hdfs_file_split());
+      const HdfsFileSplitPB& split = 
scan_range_param.scan_range().hdfs_file_split();
       HdfsPartitionDescriptor* partition_desc =
-          hdfs_table_->GetPartition(split.partition_id);
+          hdfs_table_->GetPartition(split.partition_id());
       scanned_file_formats_.insert(partition_desc->file_format());
     }
   }
@@ -328,28 +331,29 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   scan_node_pool_.reset(new MemPool(mem_tracker()));
 
   HdfsFsCache::HdfsFsMap fs_cache;
-  // Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and 
populate
+  // Convert the ScanRangeParamsPB into per-file DiskIO::ScanRange objects and 
populate
   // partition_ids_, file_descs_, and per_type_files_.
   DCHECK(scan_range_params_ != NULL)
       << "Must call SetScanRanges() before calling Prepare()";
   int num_ranges_missing_volume_id = 0;
-  for (const TScanRangeParams& params: *scan_range_params_) {
-    DCHECK(params.scan_range.__isset.hdfs_file_split);
-    const THdfsFileSplit& split = params.scan_range.hdfs_file_split;
-    partition_ids_.insert(split.partition_id);
+  for (const ScanRangeParamsPB& params : *scan_range_params_) {
+    DCHECK(params.scan_range().has_hdfs_file_split());
+    const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split();
+    partition_ids_.insert(split.partition_id());
     HdfsPartitionDescriptor* partition_desc =
-        hdfs_table_->GetPartition(split.partition_id);
+        hdfs_table_->GetPartition(split.partition_id());
     if (partition_desc == NULL) {
       // TODO: this should be a DCHECK but we sometimes hit it. It's likely 
IMPALA-1702.
       LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
-                 << " partition_id=" << split.partition_id
-                 << "\n" << PrintThrift(state->instance_ctx());
+                 << " partition_id=" << split.partition_id() << "\n"
+                 << PrintThrift(state->instance_ctx()) << "\n"
+                 << state->instance_ctx_pb().DebugString();
       return Status("Query encountered invalid metadata, likely due to 
IMPALA-1702."
                     " Try rerunning the query.");
     }
 
     filesystem::path file_path(partition_desc->location());
-    file_path.append(split.relative_path, filesystem::path::codecvt());
+    file_path.append(split.relative_path(), filesystem::path::codecvt());
     const string& native_file_path = file_path.native();
 
     auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
@@ -359,10 +363,10 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
       // Add new file_desc to file_descs_ and per_type_files_
       file_desc = runtime_state_->obj_pool()->Add(new 
HdfsFileDesc(native_file_path));
       file_descs_[file_desc_map_key] = file_desc;
-      file_desc->file_length = split.file_length;
-      file_desc->mtime = split.mtime;
-      file_desc->file_compression = split.file_compression;
-      file_desc->is_erasure_coded = split.is_erasure_coded;
+      file_desc->file_length = split.file_length();
+      file_desc->mtime = split.mtime();
+      file_desc->file_compression = 
CompressionTypePBToThrift(split.file_compression());
+      file_desc->is_erasure_coded = split.is_erasure_coded();
       RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(
           native_file_path, &file_desc->fs, &fs_cache));
       per_type_files_[partition_desc->file_format()].push_back(file_desc);
@@ -371,19 +375,19 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
       file_desc = file_desc_it->second;
     }
 
-    bool expected_local = params.__isset.is_remote && !params.is_remote;
-    if (expected_local && params.volume_id == -1) 
++num_ranges_missing_volume_id;
+    bool expected_local = params.has_is_remote() && !params.is_remote();
+    if (expected_local && params.volume_id() == -1) 
++num_ranges_missing_volume_id;
 
     int cache_options = BufferOpts::NO_CACHING;
-    if (params.__isset.try_hdfs_cache && params.try_hdfs_cache) {
+    if (params.has_try_hdfs_cache() && params.try_hdfs_cache()) {
       cache_options |= BufferOpts::USE_HDFS_CACHE;
     }
     if ((!expected_local || FLAGS_always_use_data_cache) && 
!IsDataCacheDisabled()) {
       cache_options |= BufferOpts::USE_DATA_CACHE;
     }
     file_desc->splits.push_back(
-        AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), 
split.length,
-            split.offset, split.partition_id, params.volume_id, expected_local,
+        AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), 
split.length(),
+            split.offset(), split.partition_id(), params.volume_id(), 
expected_local,
             file_desc->is_erasure_coded, file_desc->mtime, 
BufferOpts(cache_options)));
   }
 
@@ -1009,17 +1013,17 @@ void HdfsScanNodeBase::TransferToScanNodePool(MemPool* 
pool) {
 }
 
 void HdfsScanNodeBase::UpdateHdfsSplitStats(
-    const vector<TScanRangeParams>& scan_range_params_list,
+    const google::protobuf::RepeatedPtrField<ScanRangeParamsPB>& 
scan_range_params_list,
     PerVolumeStats* per_volume_stats) {
   pair<int, int64_t> init_value(0, 0);
-  for (const TScanRangeParams& scan_range_params: scan_range_params_list) {
-    const TScanRange& scan_range = scan_range_params.scan_range;
-    if (!scan_range.__isset.hdfs_file_split) continue;
-    const THdfsFileSplit& split = scan_range.hdfs_file_split;
+  for (const ScanRangeParamsPB& scan_range_params : scan_range_params_list) {
+    const ScanRangePB& scan_range = scan_range_params.scan_range();
+    if (!scan_range.has_hdfs_file_split()) continue;
+    const HdfsFileSplitPB& split = scan_range.hdfs_file_split();
     pair<int, int64_t>* stats =
-        FindOrInsert(per_volume_stats, scan_range_params.volume_id, 
init_value);
+        FindOrInsert(per_volume_stats, scan_range_params.volume_id(), 
init_value);
     ++(stats->first);
-    stats->second += split.length;
+    stats->second += split.length();
   }
 }
 
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index 19c12ec..3d88706 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -414,7 +414,7 @@ class HdfsScanNodeBase : public ScanNode {
 
   /// Update the per volume stats with the given scan range params list
   static void UpdateHdfsSplitStats(
-      const std::vector<TScanRangeParams>& scan_range_params_list,
+      const google::protobuf::RepeatedPtrField<ScanRangeParamsPB>& 
scan_range_params_list,
       PerVolumeStats* per_volume_stats);
 
   /// Output the per_volume_stats to stringstream. The output format is a list 
of:
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index c1cde0b..55fe42f 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -64,9 +64,8 @@ Status HdfsTableSinkConfig::Init(
   return Status::OK();
 }
 
-DataSink* HdfsTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* 
state) const {
-  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+DataSink* HdfsTableSinkConfig::CreateSink(RuntimeState* state) const {
+  TDataSinkId sink_id = state->fragment().idx;
   return state->obj_pool()->Add(
       new HdfsTableSink(sink_id, *this, 
this->tsink_->table_sink.hdfs_table_sink, state));
 }
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 2ce9d5c..9c27a53 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -96,9 +96,7 @@ struct OutputPartition {
 
 class HdfsTableSinkConfig : public DataSinkConfig {
  public:
-  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
-      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-      RuntimeState* state) const override;
+  DataSink* CreateSink(RuntimeState* state) const override;
   void Close() override;
 
   /// Expressions for computing the target partitions to which a row is 
written.
diff --git a/be/src/exec/kudu-scan-node-base.cc 
b/be/src/exec/kudu-scan-node-base.cc
index 04b907d..a047e75 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -78,12 +78,13 @@ Status KuduScanNodeBase::Prepare(RuntimeState* state) {
   DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL);
   tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
 
-  // Initialize the list of scan tokens to process from the TScanRangeParams.
+  // Initialize the list of scan tokens to process from the ScanRangeParamsPB.
   DCHECK(scan_range_params_ != NULL);
   int num_remote_tokens = 0;
-  for (const TScanRangeParams& params: *scan_range_params_) {
-    if (params.__isset.is_remote && params.is_remote) ++num_remote_tokens;
-    scan_tokens_.push_back(params.scan_range.kudu_scan_token);
+  for (const ScanRangeParamsPB& params : *scan_range_params_) {
+    if (params.has_is_remote() && params.is_remote()) ++num_remote_tokens;
+    DCHECK(params.scan_range().has_kudu_scan_token());
+    scan_tokens_.push_back(params.scan_range().kudu_scan_token());
   }
   COUNTER_SET(kudu_remote_tokens_, num_remote_tokens);
 
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 1b2e80d..17969fa 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -67,9 +67,8 @@ namespace impala {
 // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
 const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
 
-DataSink* KuduTableSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* 
state) const {
-  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+DataSink* KuduTableSinkConfig::CreateSink(RuntimeState* state) const {
+  TDataSinkId sink_id = state->fragment().idx;
   return state->obj_pool()->Add(
       new KuduTableSink(sink_id, *this, tsink_->table_sink, state));
 }
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 99fd7af..38e12c3 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -32,9 +32,7 @@ class KuduTableDescriptor;
 
 class KuduTableSinkConfig : public DataSinkConfig {
  public:
-  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
-      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-      RuntimeState* state) const override;
+  DataSink* CreateSink(RuntimeState* state) const override;
 
   ~KuduTableSinkConfig() override {}
 };
diff --git a/be/src/exec/nested-loop-join-builder.cc 
b/be/src/exec/nested-loop-join-builder.cc
index 56fdeaa..1b4fc6e 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -28,11 +28,9 @@
 
 using namespace impala;
 
-DataSink* NljBuilderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-    RuntimeState* state) const {
+DataSink* NljBuilderConfig::CreateSink(RuntimeState* state) const {
   // We have one fragment per sink, so we can use the fragment index as the 
sink ID.
-  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  TDataSinkId sink_id = state->fragment().idx;
   return NljBuilder::CreateSeparateBuilder(sink_id, *this, state);
 }
 
diff --git a/be/src/exec/nested-loop-join-builder.h 
b/be/src/exec/nested-loop-join-builder.h
index bcaecbd..b65c13f 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -29,9 +29,7 @@ namespace impala {
 
 class NljBuilderConfig : public JoinBuilderConfig {
  public:
-  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
-      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-      RuntimeState* state) const override;
+  DataSink* CreateSink(RuntimeState* state) const override;
 
   ~NljBuilderConfig() override {}
 
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index 5ed95bd..bb65c7e 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -58,10 +58,9 @@ static string ConstructBuilderName(int join_node_id) {
   return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id);
 }
 
-DataSink* PhjBuilderConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* 
state) const {
+DataSink* PhjBuilderConfig::CreateSink(RuntimeState* state) const {
   // We have one fragment per sink, so we can use the fragment index as the 
sink ID.
-  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+  TDataSinkId sink_id = state->fragment().idx;
   ObjectPool* pool = state->obj_pool();
   return pool->Add(new PhjBuilder(sink_id, *this, state));
 }
diff --git a/be/src/exec/partitioned-hash-join-builder.h 
b/be/src/exec/partitioned-hash-join-builder.h
index 01d0f03..346b96c 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -51,9 +51,7 @@ class ScalarExprEvaluator;
 /// DataSinkConfig::CreateSink() are not implemented for it.
 class PhjBuilderConfig : public JoinBuilderConfig {
  public:
-  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-    RuntimeState* state) const override;
+  DataSink* CreateSink(RuntimeState* state) const override;
 
   /// Creates a PhjBuilder for embedded use within a PartitionedHashJoinNode.
   PhjBuilder* CreateSink(BufferPool::ClientHandle* buffer_pool_client,
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 43568d1..918da88 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -37,13 +37,12 @@ using std::unique_lock;
 
 namespace impala {
 
-DataSink* PlanRootSinkConfig::CreateSink(const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* 
state) const {
-  TDataSinkId sink_id = fragment_ctx.fragment.idx;
+DataSink* PlanRootSinkConfig::CreateSink(RuntimeState* state) const {
+  TDataSinkId sink_id = state->fragment().idx;
   ObjectPool* pool = state->obj_pool();
   if (state->query_options().spool_query_results) {
     return pool->Add(new BufferedPlanRootSink(
-        sink_id, *this, state, fragment_instance_ctx.debug_options));
+        sink_id, *this, state, state->instance_ctx().debug_options));
   } else {
     return pool->Add(new BlockingPlanRootSink(sink_id, *this, state));
   }
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index ecc03aa..48e5b24 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -29,9 +29,7 @@ class ScalarExprEvaluator;
 
 class PlanRootSinkConfig : public DataSinkConfig {
  public:
-  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
-      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-      RuntimeState* state) const override;
+  DataSink* CreateSink(RuntimeState* state) const override;
 
   ~PlanRootSinkConfig() override {}
 };
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index c02d995..e61f179 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -27,7 +27,7 @@
 namespace impala {
 
 class BlockingRowBatchQueue;
-class TScanRange;
+class ScanRangeParamsPB;
 
 class ScanPlanNode : public PlanNode {
  public:
@@ -115,7 +115,8 @@ class ScanNode : public ExecNode {
 
   /// This should be called before Prepare(), and the argument must be not 
destroyed until
   /// after Prepare().
-  void SetScanRanges(const std::vector<TScanRangeParams>& scan_range_params) {
+  void SetScanRanges(
+      const google::protobuf::RepeatedPtrField<ScanRangeParamsPB>& 
scan_range_params) {
     scan_range_params_ = &scan_range_params;
   }
 
@@ -144,7 +145,7 @@ class ScanNode : public ExecNode {
   RuntimeState* runtime_state_ = nullptr;
 
   /// The scan ranges this scan node is responsible for. Not owned.
-  const std::vector<TScanRangeParams>* scan_range_params_;
+  const google::protobuf::RepeatedPtrField<ScanRangeParamsPB>* 
scan_range_params_;
 
   /// Total bytes read from the scanner. Initialised in subclasses that track
   /// bytes read, including HDFS and HBase by calling AddBytesReadCounters().
diff --git a/be/src/exprs/expr-codegen-test.cc 
b/be/src/exprs/expr-codegen-test.cc
index c5d95a8..35d7c69 100644
--- a/be/src/exprs/expr-codegen-test.cc
+++ b/be/src/exprs/expr-codegen-test.cc
@@ -117,8 +117,10 @@ class ExprCodegenTest : public ::testing::Test {
     ASSERT_OK(test_env_->Init());
     ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &runtime_state_));
     QueryState* qs = runtime_state_->query_state();
-    TPlanFragmentCtx* fragment_ctx = qs->obj_pool()->Add(new 
TPlanFragmentCtx());
-    fragment_state_ = qs->obj_pool()->Add(new FragmentState(qs, 
*fragment_ctx));
+    TPlanFragment* fragment = qs->obj_pool()->Add(new TPlanFragment());
+    PlanFragmentCtxPB* fragment_ctx = qs->obj_pool()->Add(new 
PlanFragmentCtxPB());
+    fragment_state_ =
+        qs->obj_pool()->Add(new FragmentState(qs, *fragment, *fragment_ctx));
 
     FunctionContext::TypeDesc return_type;
     return_type.type = FunctionContext::TYPE_DECIMAL;
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index ffd4481..10ba6ac 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -24,12 +24,14 @@ set(EXECUTABLE_OUTPUT_PATH 
"${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
 # Mark the protobuf files as generated
 set_source_files_properties(${COMMON_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 set_source_files_properties(${RPC_TEST_PROTO_SRCS} PROPERTIES GENERATED TRUE)
+set_source_files_properties(${PLANNER_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 
 add_library(Rpc
   authentication.cc
   ${COMMON_PROTO_SRCS}
   cookie-util.cc
   impala-service-pool.cc
+  ${PLANNER_PROTO_SRCS}
   rpc-mgr.cc
   rpc-trace.cc
   TAcceptQueueServer.cpp
diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index 06e1732..01208ae 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -120,32 +120,41 @@ void Coordinator::BackendState::SetRpcParams(const 
DebugOptions& debug_options,
   request->set_per_backend_mem_limit(schedule_.per_backend_mem_limit());
 
   // set fragment_ctxs and fragment_instance_ctxs
-  fragment_info->__isset.fragment_ctxs = true;
+  fragment_info->__isset.fragments = true;
   fragment_info->__isset.fragment_instance_ctxs = true;
   fragment_info->fragment_instance_ctxs.resize(
       backend_exec_params_->instance_params.size());
   for (int i = 0; i < backend_exec_params_->instance_params.size(); ++i) {
     TPlanFragmentInstanceCtx& instance_ctx = 
fragment_info->fragment_instance_ctxs[i];
+    PlanFragmentInstanceCtxPB* instance_ctx_pb = 
request->add_fragment_instance_ctxs();
     const FInstanceExecParams& params = 
*backend_exec_params_->instance_params[i];
     int fragment_idx = params.fragment_exec_params.fragment.idx;
 
-    // add a TPlanFragmentCtx, if we don't already have it
-    if (fragment_info->fragment_ctxs.empty()
-        || fragment_info->fragment_ctxs.back().fragment.idx != fragment_idx) {
-      fragment_info->fragment_ctxs.emplace_back();
-      TPlanFragmentCtx& fragment_ctx = fragment_info->fragment_ctxs.back();
-      fragment_ctx.__set_fragment(params.fragment_exec_params.fragment);
-      
fragment_ctx.__set_destinations(params.fragment_exec_params.destinations);
+    // add a TPlanFragment, if we don't already have it
+    if (fragment_info->fragments.empty()
+        || fragment_info->fragments.back().idx != fragment_idx) {
+      fragment_info->fragments.push_back(params.fragment_exec_params.fragment);
+      PlanFragmentCtxPB* fragment_ctx = request->add_fragment_ctxs();
+      fragment_ctx->set_fragment_idx(params.fragment_exec_params.fragment.idx);
+      *fragment_ctx->mutable_destinations() = {
+          params.fragment_exec_params.destinations.begin(),
+          params.fragment_exec_params.destinations.end()};
     }
 
     instance_ctx.fragment_idx = fragment_idx;
+    instance_ctx_pb->set_fragment_idx(fragment_idx);
     instance_ctx.fragment_instance_id = params.instance_id;
     instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx;
-    instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges);
+    for (const auto& entry : params.per_node_scan_ranges) {
+      ScanRangesPB& scan_ranges =
+          (*instance_ctx_pb->mutable_per_node_scan_ranges())[entry.first];
+      *scan_ranges.mutable_scan_ranges() = {entry.second.begin(), 
entry.second.end()};
+    }
     instance_ctx.__set_per_exch_num_senders(
         params.fragment_exec_params.per_exch_num_senders);
     instance_ctx.__set_sender_id(params.sender_id);
-    instance_ctx.__set_join_build_inputs(params.join_build_inputs);
+    *instance_ctx_pb->mutable_join_build_inputs() = {
+        params.join_build_inputs.begin(), params.join_build_inputs.end()};
     if (params.num_join_build_outputs != -1) {
       instance_ctx.__set_num_join_build_outputs(params.num_join_build_outputs);
     }
@@ -715,9 +724,9 @@ Coordinator::BackendState::InstanceStats::InstanceStats(
 
   // add total split size to fragment_stats->bytes_assigned()
   for (const PerNodeScanRanges::value_type& entry: 
exec_params_.per_node_scan_ranges) {
-    for (const TScanRangeParams& scan_range_params: entry.second) {
-      if (!scan_range_params.scan_range.__isset.hdfs_file_split) continue;
-      total_split_size_ += scan_range_params.scan_range.hdfs_file_split.length;
+    for (const ScanRangeParamsPB& scan_range_params : entry.second) {
+      if (!scan_range_params.scan_range().has_hdfs_file_split()) continue;
+      total_split_size_ += 
scan_range_params.scan_range().hdfs_file_split().length();
     }
   }
   (*fragment_stats->bytes_assigned())(total_split_size_);
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index ddcd84d..57fa713 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -154,10 +154,11 @@ class DataStreamTest : public testing::Test {
     ABORT_IF_ERROR(exec_env_->InitForFeTests());
     exec_env_->InitBufferPool(32 * 1024, 1024 * 1024 * 1024, 32 * 1024);
     runtime_state_.reset(new RuntimeState(TQueryCtx(), exec_env_.get()));
-    TPlanFragmentCtx* fragment_ctx =
-        runtime_state_->obj_pool()->Add(new TPlanFragmentCtx());
+    TPlanFragment* fragment = runtime_state_->obj_pool()->Add(new 
TPlanFragment());
+    PlanFragmentCtxPB* fragment_ctx =
+        runtime_state_->obj_pool()->Add(new PlanFragmentCtxPB());
     fragment_state_ = runtime_state_->obj_pool()->Add(
-        new FragmentState(runtime_state_->query_state(), *fragment_ctx));
+        new FragmentState(runtime_state_->query_state(), *fragment, 
*fragment_ctx));
     mem_pool_.reset(new MemPool(&tracker_));
 
     // Register a BufferPool client for allocating buffers for row batches.
@@ -176,8 +177,8 @@ class DataStreamTest : public testing::Test {
     tsort_info_.is_asc_order.push_back(true);
     tsort_info_.nulls_first.push_back(true);
 
-    next_instance_id_.lo = 0;
-    next_instance_id_.hi = 0;
+    next_instance_id_.set_lo(0);
+    next_instance_id_.set_hi(0);
     stream_mgr_ = exec_env_->stream_mgr();
 
     broadcast_sink_.dest_node_id = DEST_NODE_ID;
@@ -243,7 +244,7 @@ class DataStreamTest : public testing::Test {
   void Reset() {
     sender_info_.clear();
     receiver_info_.clear();
-    dest_.clear();
+    dest_.Clear();
   }
 
   ObjectPool obj_pool_;
@@ -256,7 +257,7 @@ class DataStreamTest : public testing::Test {
   boost::scoped_ptr<ExecEnv> exec_env_;
   scoped_ptr<RuntimeState> runtime_state_;
   FragmentState* fragment_state_;
-  TUniqueId next_instance_id_;
+  UniqueIdPB next_instance_id_;
   string stmt_;
   // The sorting expression for the single BIGINT column.
   vector<ScalarExpr*> ordering_exprs_;
@@ -282,7 +283,7 @@ class DataStreamTest : public testing::Test {
   TDataStreamSink broadcast_sink_;
   TDataStreamSink random_sink_;
   TDataStreamSink hash_sink_;
-  vector<TPlanFragmentDestination> dest_;
+  google::protobuf::RepeatedPtrField<PlanFragmentDestinationPB> dest_;
 
   struct SenderInfo {
     unique_ptr<thread> thread_handle;
@@ -318,14 +319,12 @@ class DataStreamTest : public testing::Test {
 
   // Create an instance id and add it to dest_
   void GetNextInstanceId(TUniqueId* instance_id) {
-    dest_.push_back(TPlanFragmentDestination());
-    TPlanFragmentDestination& dest = dest_.back();
-    dest.fragment_instance_id = next_instance_id_;
-    dest.thrift_backend.hostname = "localhost";
-    dest.thrift_backend.port = FLAGS_port;
-    dest.__set_krpc_backend(krpc_address_);
-    *instance_id = next_instance_id_;
-    ++next_instance_id_.lo;
+    PlanFragmentDestinationPB* dest = dest_.Add();
+    *dest->mutable_fragment_instance_id() = next_instance_id_;
+    *dest->mutable_thrift_backend() = MakeNetworkAddressPB("localhost", 
FLAGS_port);
+    *dest->mutable_krpc_backend() = FromTNetworkAddress(krpc_address_);
+    UniqueIdPBToTUniqueId(next_instance_id_, instance_id);
+    next_instance_id_.set_lo(next_instance_id_.lo() + 1);
   }
 
   // RowDescriptor to mimic "select bigint_col from alltypesagg", except the 
slot
@@ -592,10 +591,11 @@ class DataStreamTest : public testing::Test {
     RuntimeState state(TQueryCtx(), exec_env_.get(), desc_tbl_);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink sink = GetSink(partition_type);
-    TPlanFragmentCtx fragment_ctx;
-    fragment_ctx.fragment.output_sink = sink;
-    fragment_ctx.destinations = dest_;
-    FragmentState fragment_state(state.query_state(), fragment_ctx);
+    TPlanFragment fragment;
+    fragment.output_sink = sink;
+    PlanFragmentCtxPB fragment_ctx;
+    *fragment_ctx.mutable_destinations() = dest_;
+    FragmentState fragment_state(state.query_state(), fragment, fragment_ctx);
     DataSinkConfig* data_sink = nullptr;
     EXPECT_OK(DataSinkConfig::CreateConfig(sink, row_desc_, &fragment_state, 
&data_sink));
 
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index 94e3105..09db8f6 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -54,6 +54,7 @@
 
 #include "common/names.h"
 
+using google::protobuf::RepeatedPtrField;
 using kudu::rpc::RpcContext;
 using namespace impala;
 using namespace apache::thrift;
@@ -67,11 +68,14 @@ static const string PREPARE_TIMER_NAME = "PrepareTime";
 static const string EXEC_TIMER_NAME = "ExecTime";
 
 FragmentInstanceState::FragmentInstanceState(QueryState* query_state,
-    FragmentState* fragment_state, const TPlanFragmentInstanceCtx& 
instance_ctx)
+    FragmentState* fragment_state, const TPlanFragmentInstanceCtx& 
instance_ctx,
+    const PlanFragmentInstanceCtxPB& instance_ctx_pb)
   : query_state_(query_state),
     fragment_state_(fragment_state),
+    fragment_(fragment_state->fragment()),
+    instance_ctx_(instance_ctx),
     fragment_ctx_(fragment_state->fragment_ctx()),
-    instance_ctx_(instance_ctx) {}
+    instance_ctx_pb_(instance_ctx_pb) {}
 
 Status FragmentInstanceState::Exec() {
   bool is_prepared = false;
@@ -139,8 +143,8 @@ Status FragmentInstanceState::Prepare() {
 
   // Do not call RETURN_IF_ERROR or explicitly return before this line,
   // runtime_state_ != nullptr is a postcondition of this function.
-  runtime_state_ = obj_pool()->Add(new RuntimeState(
-      query_state_, fragment_ctx_, instance_ctx_, ExecEnv::GetInstance()));
+  runtime_state_ = obj_pool()->Add(new RuntimeState(query_state_, fragment_,
+      instance_ctx_, fragment_ctx_, instance_ctx_pb_, ExecEnv::GetInstance()));
 
   // total_time_counter() is in the runtime_state_ so start it up now.
   SCOPED_TIMER(profile()->total_time_counter());
@@ -197,12 +201,12 @@ Status FragmentInstanceState::Prepare() {
 
   // set scan ranges
   vector<ExecNode*> scan_nodes;
-  vector<TScanRangeParams> no_scan_ranges;
+  ScanRangesPB no_scan_ranges;
   exec_tree_->CollectScanNodes(&scan_nodes);
   for (ExecNode* scan_node: scan_nodes) {
-    const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
-        instance_ctx_.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
-    static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges);
+    const ScanRangesPB& scan_ranges = FindWithDefault(
+        instance_ctx_pb_.per_node_scan_ranges(), scan_node->id(), 
no_scan_ranges);
+    
static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges.scan_ranges());
   }
 
   RuntimeProfile::Counter* prepare_timer =
@@ -216,7 +220,7 @@ Status FragmentInstanceState::Prepare() {
   // prepare sink_
   const DataSinkConfig* sink_config = fragment_state_->sink_config();
   DCHECK(sink_config != nullptr);
-  sink_ = sink_config->CreateSink(fragment_ctx_, instance_ctx_, 
runtime_state_);
+  sink_ = sink_config->CreateSink(runtime_state_);
   RETURN_IF_ERROR(sink_->Prepare(runtime_state_, 
runtime_state_->instance_mem_tracker()));
   RuntimeProfile* sink_profile = sink_->profile();
   if (sink_profile != nullptr) profile()->AddChild(sink_profile);
@@ -396,7 +400,7 @@ void FragmentInstanceState::Close() {
 
   // If we haven't already released this thread token in Prepare(), release
   // it before calling Close().
-  if (fragment_ctx_.fragment.output_sink.type != 
TDataSinkType::PLAN_ROOT_SINK) {
+  if (fragment_.output_sink.type != TDataSinkType::PLAN_ROOT_SINK) {
     ReleaseThreadToken();
   }
 
@@ -536,13 +540,13 @@ const string& 
FragmentInstanceState::ExecStateToString(FInstanceExecStatePB stat
 }
 
 PlanRootSink* FragmentInstanceState::GetRootSink() const {
-  return fragment_ctx_.fragment.output_sink.type == 
TDataSinkType::PLAN_ROOT_SINK ?
+  return fragment_.output_sink.type == TDataSinkType::PLAN_ROOT_SINK ?
       static_cast<PlanRootSink*>(sink_) :
       nullptr;
 }
 
 bool FragmentInstanceState::HasJoinBuildSink() const {
-  return IsJoinBuildSink(fragment_ctx_.fragment.output_sink.type);
+  return IsJoinBuildSink(fragment_.output_sink.type);
 }
 
 JoinBuilder* FragmentInstanceState::GetJoinBuildSink() const {
@@ -562,11 +566,11 @@ RuntimeProfile* FragmentInstanceState::profile() const {
 }
 
 void FragmentInstanceState::PrintVolumeIds() {
-  if (instance_ctx_.per_node_scan_ranges.empty()) return;
+  if (instance_ctx_pb_.per_node_scan_ranges().empty()) return;
 
   HdfsScanNodeBase::PerVolumeStats per_volume_stats;
-  for (const PerNodeScanRanges::value_type& entry: 
instance_ctx_.per_node_scan_ranges) {
-    HdfsScanNodeBase::UpdateHdfsSplitStats(entry.second, &per_volume_stats);
+  for (const auto& entry : instance_ctx_pb_.per_node_scan_ranges()) {
+    HdfsScanNodeBase::UpdateHdfsSplitStats(entry.second.scan_ranges(), 
&per_volume_stats);
   }
 
   stringstream str;
diff --git a/be/src/runtime/fragment-instance-state.h 
b/be/src/runtime/fragment-instance-state.h
index 36c81d8..3031174 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -44,7 +44,7 @@ class RpcContext;
 namespace impala {
 
 class FragmentState;
-class TPlanFragmentCtx;
+class TPlanFragment;
 class TPlanFragmentInstanceCtx;
 class TBloomFilter;
 class TUniqueId;
@@ -81,7 +81,8 @@ class JoinBuilder;
 class FragmentInstanceState {
  public:
   FragmentInstanceState(QueryState* query_state, FragmentState* fragment_state,
-      const TPlanFragmentInstanceCtx& instance_ctx);
+      const TPlanFragmentInstanceCtx& instance_ctx,
+      const PlanFragmentInstanceCtxPB& instance_ctx_pb);
 
   /// Main loop of fragment instance execution. Blocks until execution 
finishes and
   /// automatically releases resources. Returns execution status.
@@ -131,8 +132,10 @@ class FragmentInstanceState {
   RuntimeState* runtime_state() { return runtime_state_; }
   RuntimeProfile* profile() const;
   const TQueryCtx& query_ctx() const;
-  const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; }
+  const TPlanFragment& fragment() const { return fragment_; }
   const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; 
}
+  const PlanFragmentCtxPB& fragment_ctx() const { return fragment_ctx_; }
+  const PlanFragmentInstanceCtxPB& instance_ctx_pb() const { return 
instance_ctx_pb_; }
   const TUniqueId& query_id() const { return query_ctx().query_id; }
   const TUniqueId& instance_id() const { return 
instance_ctx_.fragment_instance_id; }
   FInstanceExecStatePB current_state() const { return current_state_.Load(); }
@@ -158,8 +161,10 @@ class FragmentInstanceState {
  private:
   QueryState* query_state_;
   FragmentState* fragment_state_;
-  const TPlanFragmentCtx& fragment_ctx_;
+  const TPlanFragment& fragment_;
   const TPlanFragmentInstanceCtx& instance_ctx_;
+  const PlanFragmentCtxPB& fragment_ctx_;
+  const PlanFragmentInstanceCtxPB& instance_ctx_pb_;
 
   /// All following member variables that are initialized to nullptr are set
   /// in Prepare().
diff --git a/be/src/runtime/fragment-state.cc b/be/src/runtime/fragment-state.cc
index 9c27312..15d9f1a 100644
--- a/be/src/runtime/fragment-state.cc
+++ b/be/src/runtime/fragment-state.cc
@@ -38,26 +38,35 @@ const string FragmentState::FSTATE_THREAD_GROUP_NAME = 
"fragment-init";
 const string FragmentState::FSTATE_THREAD_NAME_PREFIX = "init-and-codegen";
 
 Status FragmentState::CreateFragmentStateMap(const TExecPlanFragmentInfo& 
fragment_info,
-    QueryState* state, std::unordered_map<TFragmentIdx, FragmentState*>& 
fragment_map) {
+    const ExecQueryFInstancesRequestPB& exec_request, QueryState* state,
+    std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map) {
   int fragment_ctx_idx = 0;
-  const TPlanFragmentCtx& frag_ctx = 
fragment_info.fragment_ctxs[fragment_ctx_idx];
+  const TPlanFragment& frag = fragment_info.fragments[fragment_ctx_idx];
+  const PlanFragmentCtxPB& frag_ctx = 
exec_request.fragment_ctxs(fragment_ctx_idx);
   FragmentState* fragment_state =
-      state->obj_pool()->Add(new FragmentState(state, frag_ctx));
+      state->obj_pool()->Add(new FragmentState(state, frag, frag_ctx));
   fragment_map[fragment_state->fragment_idx()] = fragment_state;
-  for (const TPlanFragmentInstanceCtx& instance_ctx :
-      fragment_info.fragment_instance_ctxs) {
-    // determine corresponding TPlanFragmentCtx
+  for (int i = 0; i < fragment_info.fragment_instance_ctxs.size(); ++i) {
+    const TPlanFragmentInstanceCtx& instance_ctx =
+        fragment_info.fragment_instance_ctxs[i];
+    const PlanFragmentInstanceCtxPB& instance_ctx_pb =
+        exec_request.fragment_instance_ctxs(i);
+    DCHECK_EQ(instance_ctx.fragment_idx, instance_ctx_pb.fragment_idx());
+    // determine corresponding TPlanFragment
     if (fragment_state->fragment_idx() != instance_ctx.fragment_idx) {
       ++fragment_ctx_idx;
-      DCHECK_LT(fragment_ctx_idx, fragment_info.fragment_ctxs.size());
-      const TPlanFragmentCtx& fragment_ctx =
-          fragment_info.fragment_ctxs[fragment_ctx_idx];
-      fragment_state = state->obj_pool()->Add(new FragmentState(state, 
fragment_ctx));
+      DCHECK_LT(fragment_ctx_idx, fragment_info.fragments.size());
+      const TPlanFragment& fragment = 
fragment_info.fragments[fragment_ctx_idx];
+      const PlanFragmentCtxPB& fragment_ctx =
+          exec_request.fragment_ctxs(fragment_ctx_idx);
+      DCHECK_EQ(fragment.idx, fragment_ctx.fragment_idx());
+      fragment_state =
+          state->obj_pool()->Add(new FragmentState(state, fragment, 
fragment_ctx));
       fragment_map[fragment_state->fragment_idx()] = fragment_state;
       // we expect fragment and instance contexts to follow the same order
       DCHECK_EQ(fragment_state->fragment_idx(), instance_ctx.fragment_idx);
     }
-    fragment_state->AddInstance(&instance_ctx);
+    fragment_state->AddInstance(&instance_ctx, &instance_ctx_pb);
   }
   // Init all fragments.
   for (auto& elem : fragment_map) {
@@ -67,9 +76,9 @@ Status FragmentState::CreateFragmentStateMap(const 
TExecPlanFragmentInfo& fragme
 }
 
 Status FragmentState::Init() {
-  RETURN_IF_ERROR(PlanNode::CreateTree(this, fragment_ctx_.fragment.plan, 
&plan_tree_));
-  
RETURN_IF_ERROR(DataSinkConfig::CreateConfig(fragment_ctx_.fragment.output_sink,
-      plan_tree_->row_descriptor_, this, &sink_config_));
+  RETURN_IF_ERROR(PlanNode::CreateTree(this, fragment_.plan, &plan_tree_));
+  RETURN_IF_ERROR(DataSinkConfig::CreateConfig(
+      fragment_.output_sink, plan_tree_->row_descriptor_, this, 
&sink_config_));
   return Status::OK();
 }
 
@@ -79,8 +88,8 @@ Status FragmentState::InvokeCodegen() {
     codegen_invoked_ = true;
     codegen_status_ = CodegenHelper();
     if (!codegen_status_.ok()) {
-      string error_ctx = Substitute("Fragment failed during codegen, fragment 
index: $0",
-          fragment_ctx_.fragment.display_name);
+      string error_ctx = Substitute(
+          "Fragment failed during codegen, fragment index: $0", 
fragment_.display_name);
       codegen_status_.AddDetail(error_ctx);
       query_state_->ErrorDuringFragmentCodegen(codegen_status_);
     }
@@ -111,12 +120,11 @@ Status FragmentState::CodegenHelper() {
   return Status::OK();
 }
 
-FragmentState::FragmentState(QueryState* query_state,
-    const TPlanFragmentCtx& fragment_ctx)
-  : query_state_(query_state),
-    fragment_ctx_(fragment_ctx) {
-  runtime_profile_ = RuntimeProfile::Create(query_state->obj_pool(),
-      Substitute("Fragment $0", fragment_ctx_.fragment.display_name));
+FragmentState::FragmentState(QueryState* query_state, const TPlanFragment& 
fragment,
+    const PlanFragmentCtxPB& fragment_ctx)
+  : query_state_(query_state), fragment_(fragment), 
fragment_ctx_(fragment_ctx) {
+  runtime_profile_ = RuntimeProfile::Create(
+      query_state->obj_pool(), Substitute("Fragment $0", 
fragment_.display_name));
   query_state_->host_profile()->AddChild(runtime_profile_);
 }
 
diff --git a/be/src/runtime/fragment-state.h b/be/src/runtime/fragment-state.h
index 54cf45d..5b983b2 100644
--- a/be/src/runtime/fragment-state.h
+++ b/be/src/runtime/fragment-state.h
@@ -37,10 +37,13 @@ class RuntimeProfile;
 class FragmentState {
  public:
   /// Create a map of fragment index to its FragmentState object and only 
populate the
-  /// thrift references of the fragment and instance context objects from 
'fragment_info'.
+  /// thrift and protobuf references of the fragment and instance context 
objects from
+  /// 'fragment_info' and 'exec_request'.
   static Status CreateFragmentStateMap(const TExecPlanFragmentInfo& 
fragment_info,
-      QueryState* state, std::unordered_map<TFragmentIdx, FragmentState*>& 
fragment_map);
-  FragmentState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx);
+      const ExecQueryFInstancesRequestPB& exec_request, QueryState* state,
+      std::unordered_map<TFragmentIdx, FragmentState*>& fragment_map);
+  FragmentState(QueryState* query_state, const TPlanFragment& fragment,
+      const PlanFragmentCtxPB& fragment_ctx);
   ~FragmentState();
 
   /// Called by all the fragment instance threads that execute this fragment. 
The first
@@ -54,12 +57,16 @@ class FragmentState {
   void ReleaseResources();
 
   ObjectPool* obj_pool() { return &obj_pool_; }
-  int fragment_idx() const { return fragment_ctx_.fragment.idx; }
+  int fragment_idx() const { return fragment_.idx; }
   const TQueryOptions& query_options() const { return 
query_state_->query_options(); }
-  const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; }
+  const TPlanFragment& fragment() const { return fragment_; }
+  const PlanFragmentCtxPB& fragment_ctx() const { return fragment_ctx_; }
   const std::vector<const TPlanFragmentInstanceCtx*>& instance_ctxs() const {
     return instance_ctxs_;
   }
+  const std::vector<const PlanFragmentInstanceCtxPB*>& instance_ctx_pbs() 
const {
+    return instance_ctx_pbs_;
+  }
   const PlanNode* plan_tree() const { return plan_tree_; }
   const DataSinkConfig* sink_config() const { return sink_config_; }
   const TUniqueId& query_id() const { return query_state_->query_id(); }
@@ -153,8 +160,11 @@ class FragmentState {
   QueryState* query_state_;
 
   /// References to the thrift structs for this fragment.
-  const TPlanFragmentCtx& fragment_ctx_;
+  const TPlanFragment& fragment_;
   std::vector<const TPlanFragmentInstanceCtx*> instance_ctxs_;
+  /// References to the protobuf structs for this fragment.
+  const PlanFragmentCtxPB& fragment_ctx_;
+  std::vector<const PlanFragmentInstanceCtxPB*> instance_ctx_pbs_;
 
   /// Lives in obj_pool(). Not mutated after being initialized in 
InitAndCodegen() except
   /// for being closed.
@@ -181,10 +191,12 @@ class FragmentState {
   /// fragment instance to call InvokeCodegen() does the actual codegen work.
   bool codegen_invoked_ = false;
 
-  /// Used by the CreateFragmentStateMap to add TPlanFragmentInstanceCtx 
thrift objects
-  /// for the fragment that this object represents.
-  void AddInstance(const TPlanFragmentInstanceCtx* instance_ctx) {
+  /// Used by the CreateFragmentStateMap to add the TPlanFragmentInstanceCtx 
and the
+  /// PlanFragmentInstanceCtxPB for the fragment that this object represents.
+  void AddInstance(const TPlanFragmentInstanceCtx* instance_ctx,
+      const PlanFragmentInstanceCtxPB* instance_ctx_pb) {
     instance_ctxs_.push_back(instance_ctx);
+    instance_ctx_pbs_.push_back(instance_ctx_pb);
   }
 
   /// Helper method used by InvokeCodegen(). Does the actual codegen work.
diff --git a/be/src/runtime/krpc-data-stream-sender.cc 
b/be/src/runtime/krpc-data-stream-sender.cc
index 5f9becc..787235c 100644
--- a/be/src/runtime/krpc-data-stream-sender.cc
+++ b/be/src/runtime/krpc-data-stream-sender.cc
@@ -89,18 +89,18 @@ Status KrpcDataStreamSenderConfig::Init(
     exchange_hash_seed_ =
         KrpcDataStreamSender::EXCHANGE_HASH_SEED_CONST ^ state->query_id().hi;
   }
-  num_channels_ = state->fragment_ctx().destinations.size();
+  num_channels_ = state->fragment_ctx().destinations().size();
   state->CheckAndAddCodegenDisabledMessage(codegen_status_msgs_);
   return Status::OK();
 }
 
-DataSink* KrpcDataStreamSenderConfig::CreateSink(const TPlanFragmentCtx& 
fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx, RuntimeState* 
state) const {
+DataSink* KrpcDataStreamSenderConfig::CreateSink(RuntimeState* state) const {
   // We have one fragment per sink, so we can use the fragment index as the 
sink ID.
-  TDataSinkId sink_id = fragment_ctx.fragment.idx;
-  return state->obj_pool()->Add(new KrpcDataStreamSender(sink_id,
-      fragment_instance_ctx.sender_id, *this, tsink_->stream_sink,
-      fragment_ctx.destinations, FLAGS_data_stream_sender_buffer_size, state));
+  TDataSinkId sink_id = state->fragment().idx;
+  return state->obj_pool()->Add(
+      new KrpcDataStreamSender(sink_id, state->instance_ctx().sender_id, *this,
+          tsink_->stream_sink, state->fragment_ctx().destinations(),
+          FLAGS_data_stream_sender_buffer_size, state));
 }
 
 void KrpcDataStreamSenderConfig::Close() {
@@ -148,8 +148,8 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
   // data is getting accumulated before being sent; it only applies when data 
is added via
   // AddRow() and not sent directly via SendBatch().
   Channel(KrpcDataStreamSender* parent, const RowDescriptor* row_desc,
-      const std::string& hostname, const TNetworkAddress& destination,
-      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
buffer_size)
+      const std::string& hostname, const NetworkAddressPB& destination,
+      const UniqueIdPB& fragment_instance_id, PlanNodeId dest_node_id, int 
buffer_size)
     : parent_(parent),
       row_desc_(row_desc),
       hostname_(hostname),
@@ -216,8 +216,8 @@ class KrpcDataStreamSender::Channel : public 
CacheLineAligned {
 
   // The triplet of IP-address:port/finst-id/node-id uniquely identifies the 
receiver.
   const std::string hostname_;
-  const TNetworkAddress address_;
-  const TUniqueId fragment_instance_id_;
+  const NetworkAddressPB address_;
+  const UniqueIdPB fragment_instance_id_;
   const PlanNodeId dest_node_id_;
 
   // The row batch for accumulating rows copied from AddRow().
@@ -373,7 +373,8 @@ Status KrpcDataStreamSender::Channel::Init(RuntimeState* 
state) {
   batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker()));
 
   // Create a DataStreamService proxy to the destination.
-  RETURN_IF_ERROR(DataStreamService::GetProxy(address_, hostname_, &proxy_));
+  RETURN_IF_ERROR(
+      DataStreamService::GetProxy(FromNetworkAddressPB(address_), hostname_, 
&proxy_));
   return Status::OK();
 }
 
@@ -390,7 +391,7 @@ template <typename ResponsePBType>
 void KrpcDataStreamSender::Channel::LogSlowRpc(
     const char* rpc_name, int64_t total_time_ns, const ResponsePBType& resp) {
   int64_t network_time_ns = total_time_ns - resp_.receiver_latency_ns();
-  LOG(INFO) << "Slow " << rpc_name << " RPC to " << 
TNetworkAddressToString(address_)
+  LOG(INFO) << "Slow " << rpc_name << " RPC to " << address_
             << " (fragment_instance_id=" << PrintId(fragment_instance_id_) << 
"): "
             << "took " << PrettyPrinter::Print(total_time_ns, TUnit::TIME_NS) 
<< ". "
             << "Receiver time: "
@@ -400,7 +401,7 @@ void KrpcDataStreamSender::Channel::LogSlowRpc(
 
 void KrpcDataStreamSender::Channel::LogSlowFailedRpc(
     const char* rpc_name, int64_t total_time_ns, const kudu::Status& err) {
-  LOG(INFO) << "Slow " << rpc_name << " RPC to " << 
TNetworkAddressToString(address_)
+  LOG(INFO) << "Slow " << rpc_name << " RPC to " << address_
             << " (fragment_instance_id=" << PrintId(fragment_instance_id_) << 
"): "
             << "took " << PrettyPrinter::Print(total_time_ns, TUnit::TIME_NS) 
<< ". "
             << "Error: " << err.ToString();
@@ -424,7 +425,7 @@ Status 
KrpcDataStreamSender::Channel::WaitForRpcLocked(std::unique_lock<SpinLock
   }
   int64_t elapsed_time_ns = timer.ElapsedTime();
   if (IsSlowRpc(elapsed_time_ns)) {
-    LOG(INFO) << "Long delay waiting for RPC to " << 
TNetworkAddressToString(address_)
+    LOG(INFO) << "Long delay waiting for RPC to " << address_
               << " (fragment_instance_id=" << PrintId(fragment_instance_id_) 
<< "): "
               << "took " << PrettyPrinter::Print(elapsed_time_ns, 
TUnit::TIME_NS);
   }
@@ -437,9 +438,9 @@ Status 
KrpcDataStreamSender::Channel::WaitForRpcLocked(std::unique_lock<SpinLock
 
   DCHECK(!rpc_in_flight_);
   if (UNLIKELY(!rpc_status_.ok())) {
-    LOG(ERROR) << "channel send to " << TNetworkAddressToString(address_) << " 
failed: "
-               << "(fragment_instance_id=" << PrintId(fragment_instance_id_) 
<< "): "
-               << rpc_status_.GetDetail();
+    LOG(ERROR) << "channel send to " << address_ << " failed: 
(fragment_instance_id="
+               << PrintId(fragment_instance_id_)
+               << "): " << rpc_status_.GetDetail();
     return rpc_status_;
   }
   return Status::OK();
@@ -523,7 +524,7 @@ void 
KrpcDataStreamSender::Channel::TransmitDataCompleteCb() {
     DoRpcFn rpc_fn =
         boost::bind(&KrpcDataStreamSender::Channel::DoTransmitDataRpc, this);
     const string& prepend =
-        Substitute("TransmitData() to $0 failed", 
TNetworkAddressToString(address_));
+        Substitute("TransmitData() to $0 failed", 
NetworkAddressPBToString(address_));
     HandleFailedRPC(rpc_fn, controller_status, prepend);
   }
 }
@@ -534,9 +535,7 @@ Status KrpcDataStreamSender::Channel::DoTransmitDataRpc() {
 
   // Initialize some constant fields in the request protobuf.
   TransmitDataRequestPB req;
-  UniqueIdPB* finstance_id_pb = req.mutable_dest_fragment_instance_id();
-  finstance_id_pb->set_lo(fragment_instance_id_.lo);
-  finstance_id_pb->set_hi(fragment_instance_id_.hi);
+  *req.mutable_dest_fragment_instance_id() = fragment_instance_id_;
   req.set_sender_id(parent_->sender_id_);
   req.set_dest_node_id(dest_node_id_);
 
@@ -641,7 +640,7 @@ void 
KrpcDataStreamSender::Channel::EndDataStreamCompleteCb() {
     DoRpcFn rpc_fn =
         boost::bind(&KrpcDataStreamSender::Channel::DoEndDataStreamRpc, this);
     const string& prepend =
-        Substitute("EndDataStream() to $0 failed", 
TNetworkAddressToString(address_));
+        Substitute("EndDataStream() to $0 failed", 
NetworkAddressPBToString(address_));
     HandleFailedRPC(rpc_fn, controller_status, prepend);
   }
 }
@@ -650,9 +649,7 @@ Status KrpcDataStreamSender::Channel::DoEndDataStreamRpc() {
   DCHECK(rpc_in_flight_);
   EndDataStreamRequestPB eos_req;
   rpc_controller_.Reset();
-  UniqueIdPB* finstance_id_pb = eos_req.mutable_dest_fragment_instance_id();
-  finstance_id_pb->set_lo(fragment_instance_id_.lo);
-  finstance_id_pb->set_hi(fragment_instance_id_.hi);
+  *eos_req.mutable_dest_fragment_instance_id() = fragment_instance_id_;
   eos_req.set_sender_id(parent_->sender_id_);
   eos_req.set_dest_node_id(dest_node_id_);
   eos_resp_.Clear();
@@ -710,7 +707,7 @@ void KrpcDataStreamSender::Channel::Teardown(RuntimeState* 
state) {
 
 KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
     const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& sink,
-    const std::vector<TPlanFragmentDestination>& destinations,
+    const google::protobuf::RepeatedPtrField<PlanFragmentDestinationPB>& 
destinations,
     int per_channel_buffer_size, RuntimeState* state)
   : DataSink(sink_id, sink_config,
         Substitute("KrpcDataStreamSender (dst_id=$0)", sink.dest_node_id), 
state),
@@ -728,11 +725,10 @@ KrpcDataStreamSender::KrpcDataStreamSender(TDataSinkId 
sink_id, int sender_id,
       || sink.output_partition.type == TPartitionType::RANDOM
       || sink.output_partition.type == TPartitionType::KUDU);
 
-  for (int i = 0; i < destinations.size(); ++i) {
-    channels_.emplace_back(
-        new Channel(this, row_desc_, destinations[i].thrift_backend.hostname,
-            destinations[i].krpc_backend, destinations[i].fragment_instance_id,
-            sink.dest_node_id, per_channel_buffer_size));
+  for (const auto& destination : destinations) {
+    channels_.emplace_back(new Channel(this, row_desc_,
+        destination.thrift_backend().hostname(), destination.krpc_backend(),
+        destination.fragment_instance_id(), sink.dest_node_id, 
per_channel_buffer_size));
   }
 
   if (partition_type_ == TPartitionType::UNPARTITIONED
diff --git a/be/src/runtime/krpc-data-stream-sender.h 
b/be/src/runtime/krpc-data-stream-sender.h
index a7e55a1..6e9a933 100644
--- a/be/src/runtime/krpc-data-stream-sender.h
+++ b/be/src/runtime/krpc-data-stream-sender.h
@@ -38,13 +38,11 @@ class MemTracker;
 class RowDescriptor;
 class TDataStreamSink;
 class TNetworkAddress;
-class TPlanFragmentDestination;
+class PlanFragmentDestinationPB;
 
 class KrpcDataStreamSenderConfig : public DataSinkConfig {
  public:
-  DataSink* CreateSink(const TPlanFragmentCtx& fragment_ctx,
-      const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-      RuntimeState* state) const override;
+  DataSink* CreateSink(RuntimeState* state) const override;
   void Close() override;
 
   /// Codegen KrpcDataStreamSender::HashAndAddRows() if partitioning type is
@@ -108,7 +106,7 @@ class KrpcDataStreamSender : public DataSink {
   /// and RANDOM.
   KrpcDataStreamSender(TDataSinkId sink_id, int sender_id,
       const KrpcDataStreamSenderConfig& sink_config, const TDataStreamSink& 
sink,
-      const std::vector<TPlanFragmentDestination>& destinations,
+      const google::protobuf::RepeatedPtrField<PlanFragmentDestinationPB>& 
destinations,
       int per_channel_buffer_size, RuntimeState* state);
 
   virtual ~KrpcDataStreamSender();
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 02ef056..7bf3a55 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -209,10 +209,16 @@ Status QueryState::Init(const 
ExecQueryFInstancesRequestPB* exec_rpc_params,
 
   // don't copy query_ctx, it's large and we already did that in the c'tor
   exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
+  exec_rpc_params_.mutable_fragment_ctxs()->Swap(
+      
const_cast<google::protobuf::RepeatedPtrField<impala::PlanFragmentCtxPB>*>(
+          &exec_rpc_params->fragment_ctxs()));
+  exec_rpc_params_.mutable_fragment_instance_ctxs()->Swap(
+      
const_cast<google::protobuf::RepeatedPtrField<impala::PlanFragmentInstanceCtxPB>*>(
+          &exec_rpc_params->fragment_instance_ctxs()));
   TExecPlanFragmentInfo& non_const_fragment_info =
       const_cast<TExecPlanFragmentInfo&>(fragment_info);
-  fragment_info_.fragment_ctxs.swap(non_const_fragment_info.fragment_ctxs);
-  fragment_info_.__isset.fragment_ctxs = true;
+  fragment_info_.fragments.swap(non_const_fragment_info.fragments);
+  fragment_info_.__isset.fragments = true;
   fragment_info_.fragment_instance_ctxs.swap(
       non_const_fragment_info.fragment_instance_ctxs);
   fragment_info_.__isset.fragment_instance_ctxs = true;
@@ -286,13 +292,12 @@ bool VerifyFiltersProduced(const 
vector<TPlanFragmentInstanceCtx>& instance_ctxs
 Status QueryState::InitFilterBank() {
   int64_t runtime_filters_reservation_bytes = 0;
   int fragment_ctx_idx = -1;
-  const vector<TPlanFragmentCtx>& fragment_ctxs = fragment_info_.fragment_ctxs;
+  const vector<TPlanFragment>& fragments = fragment_info_.fragments;
   const vector<TPlanFragmentInstanceCtx>& instance_ctxs =
       fragment_info_.fragment_instance_ctxs;
   // Add entries for all produced and consumed filters.
   unordered_map<int32_t, FilterRegistration> filters;
-  for (const TPlanFragmentCtx& fragment_ctx : fragment_ctxs) {
-    const TPlanFragment& fragment = fragment_ctx.fragment;
+  for (const TPlanFragment& fragment : fragments) {
     for (const TPlanNode& plan_node : fragment.plan.nodes) {
       if (!plan_node.__isset.runtime_filters) continue;
       for (const TRuntimeFilterDesc& filter : plan_node.runtime_filters) {
@@ -315,15 +320,15 @@ Status QueryState::InitFilterBank() {
       << "Filters produced by all instances on the same backend should be the 
same";
   for (const TPlanFragmentInstanceCtx& instance_ctx : instance_ctxs) {
     bool first_instance_of_fragment = fragment_ctx_idx == -1
-        || fragment_ctxs[fragment_ctx_idx].fragment.idx != 
instance_ctx.fragment_idx;
+        || fragments[fragment_ctx_idx].idx != instance_ctx.fragment_idx;
     if (first_instance_of_fragment) {
       ++fragment_ctx_idx;
-      DCHECK_EQ(fragment_ctxs[fragment_ctx_idx].fragment.idx, 
instance_ctx.fragment_idx);
+      DCHECK_EQ(fragments[fragment_ctx_idx].idx, instance_ctx.fragment_idx);
     }
     // TODO: this over-reserves memory a bit in a couple of cases:
     // * if different fragments on this backend consume or produce the same 
filter.
     // * if a finstance was chosen not to produce a global broadcast filter.
-    const TPlanFragment& fragment = fragment_ctxs[fragment_ctx_idx].fragment;
+    const TPlanFragment& fragment = fragments[fragment_ctx_idx];
     runtime_filters_reservation_bytes +=
         fragment.produced_runtime_filters_reservation_bytes;
     if (first_instance_of_fragment) {
@@ -617,7 +622,7 @@ bool QueryState::StartFInstances() {
   DCHECK_GT(refcnt_.Load(), 0);
   DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in 
Init()";
 
-  DCHECK_GT(fragment_info_.fragment_ctxs.size(), 0);
+  DCHECK_GT(fragment_info_.fragments.size(), 0);
   vector<unique_ptr<Thread>> codegen_threads;
   int num_unstarted_instances = fragment_info_.fragment_instance_ctxs.size();
 
@@ -629,16 +634,20 @@ bool QueryState::StartFInstances() {
   VLOG(2) << "descriptor table for query=" << PrintId(query_id())
           << "\n" << desc_tbl_->DebugString();
 
-  start_finstances_status =
-      FragmentState::CreateFragmentStateMap(fragment_info_, this, 
fragment_state_map_);
+  start_finstances_status = FragmentState::CreateFragmentStateMap(
+      fragment_info_, exec_rpc_params_, this, fragment_state_map_);
   if (UNLIKELY(!start_finstances_status.ok())) goto error;
 
   fragment_events_start_time_ = MonotonicStopWatch::Now();
   for (auto& fragment : fragment_state_map_) {
     FragmentState* fragment_state = fragment.second;
-    for (const TPlanFragmentInstanceCtx* instance_ctx : 
fragment_state->instance_ctxs()) {
-      FragmentInstanceState* fis =
-          obj_pool_.Add(new FragmentInstanceState(this, fragment_state, 
*instance_ctx));
+    for (int i = 0; i < fragment_state->instance_ctxs().size(); ++i) {
+      const TPlanFragmentInstanceCtx* instance_ctx = 
fragment_state->instance_ctxs()[i];
+      const PlanFragmentInstanceCtxPB* instance_ctx_pb =
+          fragment_state->instance_ctx_pbs()[i];
+      DCHECK_EQ(instance_ctx->fragment_idx, instance_ctx_pb->fragment_idx());
+      FragmentInstanceState* fis = obj_pool_.Add(new FragmentInstanceState(
+          this, fragment_state, *instance_ctx, *instance_ctx_pb));
 
       // start new thread to execute instance
       refcnt_.Add(1); // decremented in ExecFInstance()
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 64d2edb..aed2e11 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -205,12 +205,12 @@ Status RowBatch::FromProtobuf(const RowDescriptor* 
row_desc,
 
   row_batch->num_rows_ = header.num_rows();
   row_batch->capacity_ = header.num_rows();
-  const CompressionType& compression_type = header.compression_type();
-  DCHECK(compression_type == CompressionType::NONE ||
-      compression_type == CompressionType::LZ4)
+  const CompressionTypePB& compression_type = header.compression_type();
+  DCHECK(compression_type == CompressionTypePB::NONE ||
+      compression_type == CompressionTypePB::LZ4)
       << "Unexpected compression type: " << compression_type;
   row_batch->Deserialize(input_tuple_offsets, input_tuple_data, 
uncompressed_size,
-      compression_type == CompressionType::LZ4, tuple_data);
+      compression_type == CompressionTypePB::LZ4, tuple_data);
   *row_batch_ptr = std::move(row_batch);
   return Status::OK();
 }
@@ -266,7 +266,7 @@ Status RowBatch::Serialize(OutboundRowBatch* output_batch) {
   header->set_num_tuples_per_row(row_desc_->tuple_descriptors().size());
   header->set_uncompressed_size(uncompressed_size);
   header->set_compression_type(
-      is_compressed ? CompressionType::LZ4 : CompressionType::NONE);
+      is_compressed ? CompressionTypePB::LZ4 : CompressionTypePB::NONE);
   return Status::OK();
 }
 
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index d314906..05c0d65 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -70,11 +70,15 @@ namespace impala {
 
 const char* RuntimeState::LLVM_CLASS_NAME = "class.impala::RuntimeState";
 
-RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& 
fragment_ctx,
-    const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env)
+RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragment& 
fragment,
+    const TPlanFragmentInstanceCtx& instance_ctx,
+    const PlanFragmentCtxPB& fragment_ctx,
+    const PlanFragmentInstanceCtxPB& instance_ctx_pb, ExecEnv* exec_env)
   : query_state_(query_state),
-    fragment_ctx_(&fragment_ctx),
+    fragment_(&fragment),
     instance_ctx_(&instance_ctx),
+    fragment_ctx_(&fragment_ctx),
+    instance_ctx_pb_(&instance_ctx_pb),
     now_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(
         query_state->query_ctx().now_string))),
     utc_timestamp_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(
@@ -97,8 +101,10 @@ RuntimeState::RuntimeState(
             qctx.client_request.query_options.mem_limit :
             -1,
         "test-pool")),
-    fragment_ctx_(nullptr),
+    fragment_(nullptr),
     instance_ctx_(nullptr),
+    fragment_ctx_(nullptr),
+    instance_ctx_pb_(nullptr),
     local_query_state_(query_state_),
     now_(new 
TimestampValue(TimestampValue::ParseSimpleDateFormat(qctx.now_string))),
     utc_timestamp_(new TimestampValue(TimestampValue::ParseSimpleDateFormat(
@@ -131,9 +137,9 @@ void RuntimeState::Init() {
   // Register with the thread mgr
   resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool();
   DCHECK(resource_pool_ != nullptr);
-  if (fragment_ctx_ != nullptr) {
+  if (fragment_ != nullptr) {
     // Ensure that the planner correctly determined the required threads.
-    
resource_pool_->set_max_required_threads(fragment_ctx_->fragment.thread_reservation);
+    resource_pool_->set_max_required_threads(fragment_->thread_reservation);
   }
 
   total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), 
"TotalThreads");
@@ -330,14 +336,12 @@ void RuntimeState::ReleaseResources() {
   released_resources_ = true;
 }
 
-void RuntimeState::SetRPCErrorInfo(TNetworkAddress dest_node, int16_t 
posix_error_code) {
+void RuntimeState::SetRPCErrorInfo(NetworkAddressPB dest_node, int16_t 
posix_error_code) {
   std::lock_guard<SpinLock> l(aux_error_info_lock_);
   if (aux_error_info_ == nullptr && !reported_aux_error_info_) {
     aux_error_info_.reset(new AuxErrorInfoPB());
     RPCErrorInfoPB* rpc_error_info = aux_error_info_->mutable_rpc_error_info();
-    NetworkAddressPB* network_addr = rpc_error_info->mutable_dest_node();
-    network_addr->set_hostname(dest_node.hostname);
-    network_addr->set_port(dest_node.port);
+    *rpc_error_info->mutable_dest_node() = dest_node;
     rpc_error_info->set_posix_error_code(posix_error_code);
   }
 }
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 91e500e..27c3704 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -54,7 +54,7 @@ class ThreadResourcePool;
 class TUniqueId;
 class ExecEnv;
 class HBaseTableFactory;
-class TPlanFragmentCtx;
+class TPlanFragment;
 class TPlanFragmentInstanceCtx;
 class QueryState;
 class ConditionVariable;
@@ -81,8 +81,10 @@ class RuntimeState {
  public:
   /// query_state, fragment_ctx, and instance_ctx need to be alive at least as 
long as
   /// the constructed RuntimeState
-  RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
-      const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env);
+  RuntimeState(QueryState* query_state, const TPlanFragment& fragment,
+      const TPlanFragmentInstanceCtx& instance_ctx,
+      const PlanFragmentCtxPB& fragment_ctx,
+      const PlanFragmentInstanceCtxPB& instance_ctx_pb, ExecEnv* exec_env);
 
   /// RuntimeState for test execution and fe-support.cc. Creates its own 
QueryState and
   /// installs desc_tbl, if set. If query_ctx.request_pool isn't set, sets it 
to "test-pool".
@@ -102,8 +104,10 @@ class RuntimeState {
   bool strict_mode() const { return query_options().strict_mode; }
   bool decimal_v2() const { return query_options().decimal_v2; }
   const TQueryCtx& query_ctx() const;
-  const TPlanFragmentCtx& fragment_ctx() const { return *fragment_ctx_; }
+  const TPlanFragment& fragment() const { return *fragment_; }
   const TPlanFragmentInstanceCtx& instance_ctx() const { return 
*instance_ctx_; }
+  const PlanFragmentCtxPB& fragment_ctx() const { return *fragment_ctx_; }
+  const PlanFragmentInstanceCtxPB& instance_ctx_pb() const { return 
*instance_ctx_pb_; }
   const TUniqueId& session_id() const { return query_ctx().session.session_id; 
}
   const std::string& do_as_user() const { return 
query_ctx().session.delegated_user; }
   const std::string& connected_user() const {
@@ -275,7 +279,7 @@ class RuntimeState {
   /// the posix error code of the failed RPC. The target node address and 
posix error code
   /// will be included in the AuxErrorInfo returned by GetAuxErrorInfo. This 
method is
   /// idempotent.
-  void SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code);
+  void SetRPCErrorInfo(NetworkAddressPB dest_node, int16_t posix_error_code);
 
   /// Returns true if this RuntimeState has any auxiliary error information, 
false
   /// otherwise. Currently, only SetRPCErrorInfo() sets aux error info.
@@ -308,8 +312,10 @@ class RuntimeState {
 
   /// Global QueryState and original thrift descriptors for this fragment 
instance.
   QueryState* const query_state_;
-  const TPlanFragmentCtx* const fragment_ctx_;
+  const TPlanFragment* const fragment_;
   const TPlanFragmentInstanceCtx* const instance_ctx_;
+  const PlanFragmentCtxPB* const fragment_ctx_;
+  const PlanFragmentInstanceCtxPB* const instance_ctx_pb_;
 
   /// only populated by the (const QueryCtx&, ExecEnv*, DescriptorTbl*) c'tor
   boost::scoped_ptr<QueryState> local_query_state_;
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index f6ef970..92b703d 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -162,17 +162,21 @@ Status TestEnv::CreateQueryState(
   ExecQueryFInstancesRequestPB rpc_params;
   // create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
   rpc_params.set_coord_state_idx(0);
+  rpc_params.add_fragment_ctxs();
+  rpc_params.add_fragment_instance_ctxs();
   TExecPlanFragmentInfo fragment_info;
-  
fragment_info.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()}));
+  fragment_info.__set_fragments(vector<TPlanFragment>({TPlanFragment()}));
   fragment_info.__set_fragment_instance_ctxs(
       vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
   RETURN_IF_ERROR(qs->Init(&rpc_params, fragment_info));
-  FragmentState* frag_state =
-      qs->obj_pool()->Add(new FragmentState(qs, 
qs->fragment_info_.fragment_ctxs[0]));
+  FragmentState* frag_state = qs->obj_pool()->Add(new FragmentState(
+      qs, qs->fragment_info_.fragments[0], 
qs->exec_rpc_params_.fragment_ctxs(0)));
   FragmentInstanceState* fis = qs->obj_pool()->Add(new 
FragmentInstanceState(qs,
-      frag_state, qs->fragment_info_.fragment_instance_ctxs[0]));
-  RuntimeState* rs = qs->obj_pool()->Add(
-      new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), 
exec_env_.get()));
+      frag_state, qs->fragment_info_.fragment_instance_ctxs[0],
+      qs->exec_rpc_params_.fragment_instance_ctxs(0)));
+  RuntimeState* rs =
+      qs->obj_pool()->Add(new RuntimeState(qs, fis->fragment(), 
fis->instance_ctx(),
+          fis->fragment_ctx(), fis->instance_ctx_pb(), exec_env_.get()));
   runtime_states_.push_back(rs);
 
   *runtime_state = rs;
diff --git a/be/src/scheduling/query-schedule.h 
b/be/src/scheduling/query-schedule.h
index f0ee98a..36527c7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -25,6 +25,7 @@
 #include "common/global-types.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h" // for TNetworkAddress
+#include "gen-cpp/control_service.pb.h"
 #include "gen-cpp/statestore_service.pb.h"
 #include "util/container-util.h"
 #include "util/runtime-profile.h"
@@ -35,7 +36,7 @@ struct FragmentExecParams;
 struct FInstanceExecParams;
 
 /// map from scan node id to a list of scan ranges
-typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
+typedef std::map<TPlanNodeId, std::vector<ScanRangeParamsPB>> 
PerNodeScanRanges;
 
 /// map from an impalad host address to the per-node assigned scan ranges;
 /// records scan range assignment for a single fragment
@@ -104,7 +105,7 @@ struct FInstanceExecParams {
   int sender_id = -1;
 
   // List of input join build finstances for joins in this finstance.
-  std::vector<TJoinBuildInput> join_build_inputs;
+  std::vector<JoinBuildInputPB> join_build_inputs;
 
   // If this is a join build fragment, the number of fragment instances that 
consume the
   // join build. -1 = invalid.
@@ -127,7 +128,7 @@ struct FInstanceExecParams {
 /// Execution parameters shared between fragment instances
 struct FragmentExecParams {
   /// output destinations of this fragment
-  std::vector<TPlanFragmentDestination> destinations;
+  std::vector<PlanFragmentDestinationPB> destinations;
 
   /// map from node id to the number of senders (node id expected to be for an
   /// ExchangeNode)
diff --git a/be/src/scheduling/scheduler-test-util.cc 
b/be/src/scheduling/scheduler-test-util.cc
index 132415f..f8a9e50 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -554,16 +554,16 @@ void Result::ProcessAssignments(const AssignmentCallback& 
cb) const {
       const TNetworkAddress& addr = assignment_elem.first;
       const PerNodeScanRanges& per_node_ranges = assignment_elem.second;
       for (const auto& per_node_ranges_elem : per_node_ranges) {
-        const vector<TScanRangeParams> scan_range_params_vector =
+        const vector<ScanRangeParamsPB> scan_range_params_vector =
             per_node_ranges_elem.second;
-        for (const TScanRangeParams& scan_range_params : 
scan_range_params_vector) {
-          const TScanRange& scan_range = scan_range_params.scan_range;
-          DCHECK(scan_range.__isset.hdfs_file_split);
-          const THdfsFileSplit& hdfs_file_split = scan_range.hdfs_file_split;
-          bool try_hdfs_cache = scan_range_params.__isset.try_hdfs_cache ?
-              scan_range_params.try_hdfs_cache : false;
+        for (const ScanRangeParamsPB& scan_range_params : 
scan_range_params_vector) {
+          const ScanRangePB& scan_range = scan_range_params.scan_range();
+          DCHECK(scan_range.has_hdfs_file_split());
+          const HdfsFileSplitPB& hdfs_file_split = 
scan_range.hdfs_file_split();
+          bool try_hdfs_cache = scan_range_params.has_try_hdfs_cache() ?
+              scan_range_params.try_hdfs_cache() : false;
           bool is_remote =
-              scan_range_params.__isset.is_remote ? 
scan_range_params.is_remote : false;
+              scan_range_params.has_is_remote() ? 
scan_range_params.is_remote() : false;
           cb({addr, hdfs_file_split, try_hdfs_cache, is_remote});
         }
       }
@@ -583,7 +583,7 @@ int Result::CountAssignmentsIf(const AssignmentFilter& 
filter) const {
 int64_t Result::CountAssignedBytesIf(const AssignmentFilter& filter) const {
   int64_t assigned_bytes = 0;
   AssignmentCallback cb = [&assigned_bytes, filter](const AssignmentInfo& 
assignment) {
-    if (filter(assignment)) assigned_bytes += 
assignment.hdfs_file_split.length;
+    if (filter(assignment)) assigned_bytes += 
assignment.hdfs_file_split.length();
   };
   ProcessAssignments(cb);
   return assigned_bytes;
@@ -603,7 +603,7 @@ void Result::CountAssignedBytesPerBackend(
   AssignmentCallback cb = [&num_assignments_per_backend](
       const AssignmentInfo& assignment) {
     (*num_assignments_per_backend)[assignment.addr.hostname] +=
-        assignment.hdfs_file_split.length;
+        assignment.hdfs_file_split.length();
   };
   ProcessAssignments(cb);
 }
diff --git a/be/src/scheduling/scheduler-test-util.h 
b/be/src/scheduling/scheduler-test-util.h
index 4005927..0e998a0 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -34,6 +34,7 @@
 namespace impala {
 
 class ClusterMembershipMgr;
+class HdfsFileSplitPB;
 class Scheduler;
 class TTopicDelta;
 
@@ -364,7 +365,7 @@ class Result {
   /// Parameter type for callbacks, which are used to filter scheduling 
results.
   struct AssignmentInfo {
     const TNetworkAddress& addr;
-    const THdfsFileSplit& hdfs_file_split;
+    const HdfsFileSplitPB& hdfs_file_split;
     bool is_cached;
     bool is_remote;
   };
diff --git a/be/src/scheduling/scheduler-test.cc 
b/be/src/scheduling/scheduler-test.cc
index 79bad69..45d652e 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -19,6 +19,7 @@
 #include <random>
 
 #include "common/logging.h"
+#include "gen-cpp/control_service.pb.h"
 #include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/scheduler.h"
 #include "scheduling/scheduler-test-util.h"
@@ -727,36 +728,37 @@ TEST_F(SchedulerTest, TestExecAtCoordWithoutLocalBackend) 
{
 // of the algorithm.
 TEST_F(SchedulerTest, TestMultipleFinstances) {
   const int NUM_RANGES = 16;
-  std::vector<TScanRangeParams> fs_ranges(NUM_RANGES);
-  std::vector<TScanRangeParams> kudu_ranges(NUM_RANGES);
+  std::vector<ScanRangeParamsPB> fs_ranges(NUM_RANGES);
+  std::vector<ScanRangeParamsPB> kudu_ranges(NUM_RANGES);
   // Create ranges with lengths 1, 2, ..., etc.
   for (int i = 0; i < NUM_RANGES; ++i) {
-    fs_ranges[i].scan_range.__set_hdfs_file_split(THdfsFileSplit());
-    fs_ranges[i].scan_range.hdfs_file_split.length = i + 1;
-    kudu_ranges[i].scan_range.__set_kudu_scan_token("fake token");
+    *fs_ranges[i].mutable_scan_range()->mutable_hdfs_file_split() = 
HdfsFileSplitPB();
+    fs_ranges[i].mutable_scan_range()->mutable_hdfs_file_split()->set_length(i 
+ 1);
+    kudu_ranges[i].mutable_scan_range()->set_kudu_scan_token("fake token");
   }
 
   // Test handling of the single instance case - all ranges go to the same 
instance.
-  vector<vector<TScanRangeParams>> fs_one_instance =
+  vector<vector<ScanRangeParamsPB>> fs_one_instance =
       Scheduler::AssignRangesToInstances(1, &fs_ranges);
   ASSERT_EQ(1, fs_one_instance.size());
   EXPECT_EQ(NUM_RANGES, fs_one_instance[0].size());
-  vector<vector<TScanRangeParams>> kudu_one_instance =
-    Scheduler::AssignRangesToInstances(1, &kudu_ranges);
+  vector<vector<ScanRangeParamsPB>> kudu_one_instance =
+      Scheduler::AssignRangesToInstances(1, &kudu_ranges);
   ASSERT_EQ(1, kudu_one_instance.size());
   EXPECT_EQ(NUM_RANGES, kudu_one_instance[0].size());
 
   // Ensure that each executor gets one range regardless of input order.
   for (int attempt = 0; attempt < 20; ++attempt) {
     std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
-    vector<vector<TScanRangeParams>> range_per_instance =
-      Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges);
+    vector<vector<ScanRangeParamsPB>> range_per_instance =
+        Scheduler::AssignRangesToInstances(NUM_RANGES, &fs_ranges);
     EXPECT_EQ(NUM_RANGES, range_per_instance.size());
     // Confirm each range is present and each instance got exactly one range.
     vector<int> range_length_count(NUM_RANGES);
     for (const auto& instance_ranges : range_per_instance) {
       ASSERT_EQ(1, instance_ranges.size());
-      
++range_length_count[instance_ranges[0].scan_range.hdfs_file_split.length - 1];
+      
++range_length_count[instance_ranges[0].scan_range().hdfs_file_split().length()
+          - 1];
     }
     for (int i = 0; i < NUM_RANGES; ++i) {
       EXPECT_EQ(1, range_length_count[i]) << i;
@@ -767,7 +769,7 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
   // across the instances regardless of input order.
   for (int attempt = 0; attempt < 20; ++attempt) {
     std::shuffle(fs_ranges.begin(), fs_ranges.end(), rng_);
-    vector<vector<TScanRangeParams>> range_per_instance =
+    vector<vector<ScanRangeParamsPB>> range_per_instance =
         Scheduler::AssignRangesToInstances(4, &fs_ranges);
     EXPECT_EQ(4, range_per_instance.size());
     // Ensure we got a range of each length in the output.
@@ -776,8 +778,8 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
       EXPECT_EQ(4, instance_ranges.size());
       int64_t instance_bytes = 0;
       for (const auto& range : instance_ranges) {
-        instance_bytes += range.scan_range.hdfs_file_split.length;
-        ++range_length_count[range.scan_range.hdfs_file_split.length - 1];
+        instance_bytes += range.scan_range().hdfs_file_split().length();
+        ++range_length_count[range.scan_range().hdfs_file_split().length() - 
1];
       }
       // Expect each instance to get sum([1, 2, ..., 16]) / 4 bytes when 
things are
       // distributed evenly.
@@ -793,13 +795,13 @@ TEST_F(SchedulerTest, TestMultipleFinstances) {
   // range, so we just need to check the # of ranges.
   for (int attempt = 0; attempt < 20; ++attempt) {
     std::shuffle(kudu_ranges.begin(), kudu_ranges.end(), rng_);
-    vector<vector<TScanRangeParams>> range_per_instance =
+    vector<vector<ScanRangeParamsPB>> range_per_instance =
         Scheduler::AssignRangesToInstances(4, &kudu_ranges);
     EXPECT_EQ(4, range_per_instance.size());
     for (const auto& instance_ranges : range_per_instance) {
       EXPECT_EQ(4, instance_ranges.size());
       for (const auto& range : instance_ranges) {
-        EXPECT_TRUE(range.scan_range.__isset.kudu_scan_token);
+        EXPECT_TRUE(range.scan_range().has_kudu_scan_token());
       }
     }
   }
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 6c29db2..dc8f1f9 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -38,6 +38,7 @@
 #include "scheduling/executor-group.h"
 #include "scheduling/hash-ring.h"
 #include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp"
+#include "util/compression-util.h"
 #include "util/debug-util.h"
 #include "util/flat_buffer.h"
 #include "util/hash-util.h"
@@ -45,6 +46,7 @@
 #include "util/network-util.h"
 #include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
+#include "util/uid-util.h"
 
 #include "common/names.h"
 
@@ -226,14 +228,15 @@ void Scheduler::ComputeFragmentExecParams(
       // populate src_params->destinations
       
src_params->destinations.resize(dest_params->instance_exec_params.size());
       for (int i = 0; i < dest_params->instance_exec_params.size(); ++i) {
-        TPlanFragmentDestination& dest = src_params->destinations[i];
-        
dest.__set_fragment_instance_id(dest_params->instance_exec_params[i].instance_id);
+        PlanFragmentDestinationPB& dest = src_params->destinations[i];
+        TUniqueIdToUniqueIdPB(dest_params->instance_exec_params[i].instance_id,
+            dest.mutable_fragment_instance_id());
         const TNetworkAddress& host = 
dest_params->instance_exec_params[i].host;
-        dest.__set_thrift_backend(host);
+        *dest.mutable_thrift_backend() = FromTNetworkAddress(host);
         const BackendDescriptorPB& desc = LookUpBackendDesc(executor_config, 
host);
         DCHECK(desc.has_krpc_address());
         DCHECK(IsResolvedAddress(desc.krpc_address()));
-        dest.__set_krpc_backend(FromNetworkAddressPB(desc.krpc_address()));
+        *dest.mutable_krpc_backend() = desc.krpc_address();
       }
 
       // enumerate senders consecutively;
@@ -314,9 +317,9 @@ void Scheduler::ComputeFragmentExecParams(const 
ExecutorConfig& executor_config,
 /// Returns a numeric weight that is proportional to the estimated processing 
time for
 /// the scan range represented by 'params'. Weights from different scan node
 /// implementations, e.g. FS vs Kudu, are not comparable.
-static int64_t ScanRangeWeight(const TScanRangeParams& params) {
-  if (params.scan_range.__isset.hdfs_file_split) {
-    return params.scan_range.hdfs_file_split.length;
+static int64_t ScanRangeWeight(const ScanRangeParamsPB& params) {
+  if (params.scan_range().has_hdfs_file_split()) {
+    return params.scan_range().hdfs_file_split().length();
   } else {
     // Give equal weight to each Kudu and Hbase split.
     // TODO: implement more accurate logic for Kudu and Hbase
@@ -429,7 +432,7 @@ void Scheduler::CreateCollocatedAndScanInstances(const 
ExecutorConfig& executor_
     // The inner vectors are the output of AssignRangesToInstances().
     // The vector may be ragged - i.e. different nodes have different numbers
     // of instances.
-    vector<vector<vector<TScanRangeParams>>> per_scan_per_instance_ranges;
+    vector<vector<vector<ScanRangeParamsPB>>> per_scan_per_instance_ranges;
     for (TPlanNodeId scan_node_id : scan_node_ids) {
       // Ensure empty list is created if no scan ranges are scheduled on this 
host.
       per_scan_per_instance_ranges.emplace_back();
@@ -478,8 +481,8 @@ void Scheduler::CreateCollocatedAndScanInstances(const 
ExecutorConfig& executor_
   }
 }
 
-vector<vector<TScanRangeParams>> Scheduler::AssignRangesToInstances(
-    int max_num_instances, vector<TScanRangeParams>* ranges) {
+vector<vector<ScanRangeParamsPB>> Scheduler::AssignRangesToInstances(
+    int max_num_instances, vector<ScanRangeParamsPB>* ranges) {
   // We need to assign scan ranges to instances. We would like the assignment 
to be
   // as even as possible, so that each instance does about the same amount of 
work.
   // Use longest-processing time (LPT) algorithm, which is a good 
approximation of the
@@ -488,7 +491,7 @@ vector<vector<TScanRangeParams>> 
Scheduler::AssignRangesToInstances(
   // to each instance.
   DCHECK_GT(max_num_instances, 0);
   int num_instances = min(max_num_instances, static_cast<int>(ranges->size()));
-  vector<vector<TScanRangeParams>> per_instance_ranges(num_instances);
+  vector<vector<ScanRangeParamsPB>> per_instance_ranges(num_instances);
   if (num_instances < 2) {
     // Short-circuit the assignment algorithm for the single instance case.
     per_instance_ranges[0] = *ranges;
@@ -502,10 +505,10 @@ vector<vector<TScanRangeParams>> 
Scheduler::AssignRangesToInstances(
       instance_heap.emplace_back(InstanceAssignment{0, i});
     }
     std::sort(ranges->begin(), ranges->end(),
-        [](const TScanRangeParams& a, const TScanRangeParams& b) {
+        [](const ScanRangeParamsPB& a, const ScanRangeParamsPB& b) {
           return ScanRangeWeight(a) > ScanRangeWeight(b);
         });
-    for (TScanRangeParams& range : *ranges) {
+    for (ScanRangeParamsPB& range : *ranges) {
       per_instance_ranges[instance_heap[0].instance_idx].push_back(range);
       instance_heap[0].weight += ScanRangeWeight(range);
       pop_heap(instance_heap.begin(), instance_heap.end());
@@ -554,9 +557,10 @@ void Scheduler::CreateCollocatedJoinBuildInstances(
           parent_exec_params.krpc_host, per_fragment_instance_idx++, 
*fragment_params);
       instance_exec_params->back().num_join_build_outputs = 0;
     }
-    TJoinBuildInput build_input;
-    build_input.__set_join_node_id(sink.dest_node_id);
-    
build_input.__set_input_finstance_id(instance_exec_params->back().instance_id);
+    JoinBuildInputPB build_input;
+    build_input.set_join_node_id(sink.dest_node_id);
+    TUniqueIdToUniqueIdPB(instance_exec_params->back().instance_id,
+        build_input.mutable_input_finstance_id());
     parent_exec_params.join_build_inputs.emplace_back(build_input);
     VLOG(3) << "Linked join build for node id=" << sink.dest_node_id
             << " build finstance=" << 
PrintId(instance_exec_params->back().instance_id)
@@ -1048,6 +1052,31 @@ void Scheduler::AssignmentCtx::SelectExecutorOnHost(
   }
 }
 
+void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* 
scan_range_pb) {
+  if (tscan_range.__isset.hdfs_file_split) {
+    HdfsFileSplitPB* hdfs_file_split = 
scan_range_pb->mutable_hdfs_file_split();
+    
hdfs_file_split->set_relative_path(tscan_range.hdfs_file_split.relative_path);
+    hdfs_file_split->set_offset(tscan_range.hdfs_file_split.offset);
+    hdfs_file_split->set_length(tscan_range.hdfs_file_split.length);
+    
hdfs_file_split->set_partition_id(tscan_range.hdfs_file_split.partition_id);
+    hdfs_file_split->set_file_length(tscan_range.hdfs_file_split.file_length);
+    hdfs_file_split->set_file_compression(
+        THdfsCompressionToProto(tscan_range.hdfs_file_split.file_compression));
+    hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
+    
hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
+    hdfs_file_split->set_partition_path_hash(
+        tscan_range.hdfs_file_split.partition_path_hash);
+  }
+  if (tscan_range.__isset.hbase_key_range) {
+    HBaseKeyRangePB* hbase_key_range = 
scan_range_pb->mutable_hbase_key_range();
+    hbase_key_range->set_startkey(tscan_range.hbase_key_range.startKey);
+    hbase_key_range->set_stopkey(tscan_range.hbase_key_range.stopKey);
+  }
+  if (tscan_range.__isset.kudu_scan_token) {
+    scan_range_pb->set_kudu_scan_token(tscan_range.kudu_scan_token);
+  }
+}
+
 void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
     const BackendDescriptorPB& executor, PlanNodeId node_id,
     const vector<TNetworkAddress>& host_list,
@@ -1104,14 +1133,15 @@ void 
Scheduler::AssignmentCtx::RecordScanRangeAssignment(
 
   PerNodeScanRanges* scan_ranges = FindOrInsert(
       assignment, FromNetworkAddressPB(executor.address()), 
PerNodeScanRanges());
-  vector<TScanRangeParams>* scan_range_params_list =
-      FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
+  vector<ScanRangeParamsPB>* scan_range_params_list =
+      FindOrInsert(scan_ranges, node_id, vector<ScanRangeParamsPB>());
   // Add scan range.
-  TScanRangeParams scan_range_params;
-  scan_range_params.scan_range = scan_range_locations.scan_range;
-  scan_range_params.__set_volume_id(volume_id);
-  scan_range_params.__set_try_hdfs_cache(try_hdfs_cache);
-  scan_range_params.__set_is_remote(remote_read);
+  ScanRangeParamsPB scan_range_params;
+  TScanRangeToScanRangePB(
+      scan_range_locations.scan_range, scan_range_params.mutable_scan_range());
+  scan_range_params.set_volume_id(volume_id);
+  scan_range_params.set_try_hdfs_cache(try_hdfs_cache);
+  scan_range_params.set_is_remote(remote_read);
   scan_range_params_list->push_back(scan_range_params);
 
   if (VLOG_FILE_IS_ON) {
@@ -1133,8 +1163,8 @@ void Scheduler::AssignmentCtx::PrintAssignment(
     VLOG_FILE << "ScanRangeAssignment: server=" << 
ThriftDebugString(entry.first);
     for (const PerNodeScanRanges::value_type& per_node_scan_ranges : 
entry.second) {
       stringstream str;
-      for (const TScanRangeParams& params : per_node_scan_ranges.second) {
-        str << ThriftDebugString(params) << " ";
+      for (const ScanRangeParamsPB& params : per_node_scan_ranges.second) {
+        str << params.DebugString() << " ";
       }
       VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << 
str.str();
     }
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index a620f6c..5167955 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -180,7 +180,7 @@ class Scheduler {
     /// Pick an executor in round-robin fashion from multiple executors on a 
single host.
     void SelectExecutorOnHost(const IpAddr& executor_ip, BackendDescriptorPB* 
executor);
 
-    /// Build a new TScanRangeParams object and append it to the assignment 
list for the
+    /// Build a new ScanRangeParamsPB object and append it to the assignment 
list for the
     /// tuple (executor, node_id) in 'assignment'. Also, update 
assignment_heap_ and
     /// assignment_byte_counters_, increase the counters 'total_assignments_' 
and
     /// 'total_local_assignments_'. 'scan_range_locations' contains 
information about the
@@ -388,8 +388,8 @@ class Scheduler {
   /// positive. Only returns non-empty vectors: if there are not enough ranges
   /// to create 'max_num_instances', fewer instances are assigned ranges.
   /// May reorder ranges in 'ranges'.
-  static std::vector<std::vector<TScanRangeParams>> AssignRangesToInstances(
-      int max_num_instances, std::vector<TScanRangeParams>* ranges);
+  static std::vector<std::vector<ScanRangeParamsPB>> AssignRangesToInstances(
+      int max_num_instances, std::vector<ScanRangeParamsPB>* ranges);
 
   /// For each instance of fragment_params's input fragment, create a 
collocated
   /// instance for fragment_params's fragment.
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index e3479f4..3dd2a0b 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -201,8 +201,9 @@ 
Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
   query_ctx.request_pool = "fe-eval-exprs";
 
   RuntimeState state(query_ctx, ExecEnv::GetInstance());
-  TPlanFragmentCtx fragment_ctx;
-  FragmentState fragment_state(state.query_state(), fragment_ctx);
+  TPlanFragment fragment;
+  PlanFragmentCtxPB fragment_ctx;
+  FragmentState fragment_state(state.query_state(), fragment, fragment_ctx);
   // Make sure to close the runtime state no matter how this scope is exited.
   const auto close_runtime_state = MakeScopeExitTrigger([&state, 
&fragment_state]() {
     fragment_state.ReleaseResources();
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 35826a4..fd8eb1a 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -42,6 +42,7 @@ add_library(Util
   codec.cc
   collection-metrics.cc
   common-metrics.cc
+  compression-util.cc
   compress.cc
   cpu-info.cc
   cyclic-barrier.cc
diff --git a/be/src/util/compression-util.cc b/be/src/util/compression-util.cc
new file mode 100644
index 0000000..3f5d55c
--- /dev/null
+++ b/be/src/util/compression-util.cc
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "util/compression-util.h"
+
+#include "common/logging.h"
+
+namespace impala {
+
+CompressionTypePB THdfsCompressionToProto(const THdfsCompression::type& 
compression) {
+  switch(compression) {
+    case NONE: return CompressionTypePB::NONE;
+    case DEFAULT: return CompressionTypePB::DEFAULT;
+    case GZIP: return CompressionTypePB::GZIP;
+    case DEFLATE: return CompressionTypePB::DEFLATE;
+    case BZIP2: return CompressionTypePB::BZIP2;
+    case SNAPPY: return CompressionTypePB::SNAPPY;
+    case SNAPPY_BLOCKED: return CompressionTypePB::SNAPPY_BLOCKED;
+    case LZO: return CompressionTypePB::LZO;
+    case LZ4: return CompressionTypePB::LZ4;
+    case ZLIB: return CompressionTypePB::ZLIB;
+    case ZSTD: return CompressionTypePB::ZSTD;
+    case BROTLI: return CompressionTypePB::BROTLI;
+    case LZ4_BLOCKED: return CompressionTypePB::LZ4_BLOCKED;
+  }
+  DCHECK(false) << "Invalid compression type: " << compression;
+  return CompressionTypePB::NONE;
+}
+
+THdfsCompression::type CompressionTypePBToThrift(const CompressionTypePB& 
compression) {
+  switch(compression) {
+    case NONE: return THdfsCompression::NONE;
+    case DEFAULT: return THdfsCompression::DEFAULT;
+    case GZIP: return THdfsCompression::GZIP;
+    case DEFLATE: return THdfsCompression::DEFLATE;
+    case BZIP2: return THdfsCompression::BZIP2;
+    case SNAPPY: return THdfsCompression::SNAPPY;
+    case SNAPPY_BLOCKED: return THdfsCompression::SNAPPY_BLOCKED;
+    case LZO: return THdfsCompression::LZO;
+    case LZ4: return THdfsCompression::LZ4;
+    case ZLIB: return THdfsCompression::ZLIB;
+    case ZSTD: return THdfsCompression::ZSTD;
+    case BROTLI: return THdfsCompression::BROTLI;
+    case LZ4_BLOCKED: return THdfsCompression::LZ4_BLOCKED;
+  }
+  DCHECK(false) << "Invalid compression type: " << compression;
+  return THdfsCompression::NONE;
+}
+
+} // namespace impala
diff --git a/common/protobuf/row_batch.proto b/be/src/util/compression-util.h
similarity index 51%
copy from common/protobuf/row_batch.proto
copy to be/src/util/compression-util.h
index e35e5fa..6304d95 100644
--- a/common/protobuf/row_batch.proto
+++ b/be/src/util/compression-util.h
@@ -14,28 +14,18 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
-
-syntax="proto2";
 
-package impala;
+#pragma once
 
-import "common.proto";
+#include "gen-cpp/CatalogObjects_types.h"
+#include "gen-cpp/common.pb.h"
 
-// The serialized version of a header of a RowBatch (in 
be/src/runtime/row-batch.h).
-// It contains the meta-data of a row batch. The actual data of a row batch is 
sent
-// as KRPC sidecars. Please see TransmitDataRequestPB for details.
-// All fields are required in V1.
-message RowBatchHeaderPB {
-  // Total number of rows contained in this batch.
-  optional int32 num_rows = 1;
+namespace impala {
 
-  // Number of tuples per row in this batch.
-  optional int32 num_tuples_per_row = 2;
+// Convert THdfsCompression to the equivalent protobuf enum.
+CompressionTypePB THdfsCompressionToProto(const THdfsCompression::type& 
compression);
 
-  // Size of 'tuple_data' in bytes before any compression is applied.
-  optional int64 uncompressed_size = 3;
+// Convert CompressionTypePB to the equivalent thrift enum.
+THdfsCompression::type CompressionTypePBToThrift(const CompressionTypePB& 
compression);
 
-  // The compression codec (if any) used for compressing the row batch.
-  optional CompressionType compression_type = 4;
-}
+} // namespace impala
diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h
index eeadf92..ebf3ae6 100644
--- a/be/src/util/container-util.h
+++ b/be/src/util/container-util.h
@@ -227,6 +227,14 @@ const V& FindWithDefault(const boost::unordered_map<K, V>& 
m, const K& key,
   return it->second;
 }
 
+template <typename K, typename V>
+const V& FindWithDefault(const google::protobuf::Map<K, V>& m, const K& key,
+                         const V& default_val) {
+  typename google::protobuf::Map<K,V>::const_iterator it = m.find(key);
+  if (it == m.end()) return default_val;
+  return it->second;
+}
+
 /// Merges (by summing) the values from two maps of values. The values must be
 /// native types or support operator +=.
 template<typename MAP_TYPE>
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index 8b0deed..89b7c83 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -48,6 +48,12 @@ inline void TUniqueIdToUniqueIdPB(
   unique_id_pb->set_hi(t_unique_id.hi);
 }
 
+inline void UniqueIdPBToTUniqueId(
+    const UniqueIdPB& unique_id_pb, TUniqueId* t_unique_id) {
+  t_unique_id->__set_lo(unique_id_pb.lo());
+  t_unique_id->__set_hi(unique_id_pb.hi());
+}
+
 /// Query id: uuid with bottom 4 bytes set to 0
 /// Fragment instance id: query id with instance index stored in the bottom 4 
bytes
 
diff --git a/common/protobuf/CMakeLists.txt b/common/protobuf/CMakeLists.txt
index 4856833..9357e8a 100644
--- a/common/protobuf/CMakeLists.txt
+++ b/common/protobuf/CMakeLists.txt
@@ -22,7 +22,7 @@ add_custom_target(proto-deps)
 
 set(PROTOBUF_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp/)
 
-foreach(pb_src common row_batch)
+foreach(pb_src common row_batch planner)
    string(TOUPPER ${pb_src} _PB_SRC_UPPER)
    set(_PROTO_SRCS ${_PB_SRC_UPPER}_PROTO_SRCS)
    set(_PROTO_HDRS ${_PB_SRC_UPPER}_PROTO_HDRS)
diff --git a/common/protobuf/common.proto b/common/protobuf/common.proto
index 6ec84b4..68fed81 100644
--- a/common/protobuf/common.proto
+++ b/common/protobuf/common.proto
@@ -39,11 +39,22 @@ message UniqueIdPB {
   required fixed64 lo = 2;
 }
 
-// The compression codec. Currently used in row batch's header to
-// indicate the type of compression applied to the row batch.
-enum CompressionType {
-  NONE = 0; // No compression.
-  LZ4 = 1;
+// The compression codec. Currently used to indicate the compression used in
+// row batches and HDFS files. Corresponds to THdfsCompression.
+enum CompressionTypePB {
+  NONE = 0;
+  DEFAULT = 1;
+  GZIP = 2;
+  DEFLATE = 3;
+  BZIP2 = 4;
+  SNAPPY = 5;
+  SNAPPY_BLOCKED = 6;
+  LZO = 7;
+  LZ4 = 8;
+  ZLIB = 9;
+  ZSTD = 10;
+  BROTLI = 11;
+  LZ4_BLOCKED = 12;
 }
 
 // This is a union over all possible return types.
diff --git a/common/protobuf/control_service.proto 
b/common/protobuf/control_service.proto
index ddbbc68..ba5454b 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -21,6 +21,7 @@ syntax="proto2";
 package impala;
 
 import "common.proto";
+import "planner.proto";
 
 import "kudu/rpc/rpc_header.proto";
 
@@ -239,6 +240,70 @@ message RemoteShutdownResultPB {
   optional ShutdownStatusPB shutdown_status = 2;
 }
 
+// Specification of one output destination of a plan fragment
+message PlanFragmentDestinationPB {
+  // The globally unique fragment instance id.
+  optional UniqueIdPB fragment_instance_id = 1;
+
+  // Hostname + port of the Thrift based ImpalaInteralService on the 
destination.
+  optional NetworkAddressPB thrift_backend = 2;
+
+  // IP address + port of the KRPC based ImpalaInternalService on the 
destination.
+  optional NetworkAddressPB krpc_backend = 3;
+}
+
+// Context to collect information that is shared among all instances of a 
particular plan
+// fragment. Corresponds to a TPlanFragment with the same idx in the
+// TExecPlanFragmentInfo.
+message PlanFragmentCtxPB {
+  // Ordinal number of corresponding fragment in the query.
+  optional int32 fragment_idx = 1;
+
+  // Output destinations, one per output partition. The partitioning of the 
output is
+  // specified by TPlanFragment.output_sink.output_partition in the 
corresponding
+  // TPlanFragment. The number of output partitions is destinations.size().
+  repeated PlanFragmentDestinationPB destinations = 2;
+}
+
+// A scan range plus the parameters needed to execute that scan.
+message ScanRangeParamsPB {
+  optional ScanRangePB scan_range = 1;
+  optional int32 volume_id = 2 [default = -1];
+  optional bool try_hdfs_cache = 3 [default = false];
+  optional bool is_remote = 4;
+}
+
+// List of ScanRangeParamsPB. This is needed so that per_node_scan_ranges in
+// PlanFragmentInstanceCtxPB can be a map since protobuf doesn't support 
repeated map
+// values.
+message ScanRangesPB {
+  repeated ScanRangeParamsPB scan_ranges = 1;
+}
+
+// Information about the input fragment instance of a join node.
+message JoinBuildInputPB {
+  // The join node id that will consume this join build.
+  optional int32 join_node_id = 1;
+
+  // Fragment instance id of the input fragment instance.
+  optional UniqueIdPB input_finstance_id = 2;
+}
+
+// Protobuf portion of the execution parameters of a single fragment instance. 
Every
+// fragment instance will also have a corresponding TPlanFragmentInstanceCtx 
with the same
+// fragment_idx.
+message PlanFragmentInstanceCtxPB {
+  // Ordinal number of corresponding fragment in the query.
+  optional int32 fragment_idx = 1;
+
+  // Map from plan node id to initial scan ranges for each scan node in
+  // TPlanFragment.plan_tree
+  map<int32, ScanRangesPB> per_node_scan_ranges = 2;
+
+  // List of input join build finstances for joins in this finstance.
+  repeated JoinBuildInputPB join_build_inputs = 3;
+}
+
 // ExecQueryFInstances
 message ExecQueryFInstancesRequestPB {
   // This backend's index into Coordinator::backend_states_, needed for 
subsequent rpcs to
@@ -268,6 +333,14 @@ message ExecQueryFInstancesRequestPB {
   // The backend memory limit (in bytes) as set by the admission controller. 
Used by the
   // query mem tracker to enforce the memory limit.
   optional int64 per_backend_mem_limit = 6;
+
+  // General execution parameters for different fragments. Corresponds to 
'fragments' in
+  // the TExecPlanFragmentInfo sidecar.
+  repeated PlanFragmentCtxPB fragment_ctxs = 7;
+
+  // Execution parameters for specific fragment instances. Corresponds to
+  // 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
+  repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
 }
 
 message ExecQueryFInstancesResponsePB {
diff --git a/common/protobuf/planner.proto b/common/protobuf/planner.proto
new file mode 100644
index 0000000..a22ff9b
--- /dev/null
+++ b/common/protobuf/planner.proto
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax="proto2";
+
+package impala;
+
+import "common.proto";
+
+// Specification of a subsection of a single HDFS file. Corresponds to 
THdfsFileSpilt and
+// should be kept in sync with it.
+message HdfsFileSplitPB {
+  // File name (not the full path).  The path is assumed to be relative to the
+  // 'location' of the THdfsPartition referenced by partition_id.
+  optional string relative_path = 1;
+
+  // Starting offset.
+  optional int64 offset = 2;
+
+  // Length of split.
+  optional int64 length = 3;
+
+  // ID of partition within the THdfsTable associated with this scan node.
+  optional int64 partition_id = 4;
+
+  // Total size of the hdfs file.
+  optional int64 file_length = 5;
+
+  // Compression type of the hdfs file.
+  optional CompressionTypePB file_compression = 6;
+
+  // Last modified time of the file.
+  optional int64 mtime = 7;
+
+  // Whether this file is erasure-coded.
+  optional bool is_erasure_coded = 8;
+
+  // Hash of the partition's path. This must be hashed with a hash algorithm 
that is
+  // consistent across different processes and machines. This is currently 
using
+  // Java's String.hashCode(), which is consistent. For testing purposes, this 
can use
+  // any consistent hash.
+  optional int32 partition_path_hash = 9;
+}
+
+// Key range for single THBaseScanNode. Corresponds to THBaseKeyRange and 
should be kept
+// in sync with it.
+message HBaseKeyRangePB {
+  // Inclusive
+  optional string startKey = 1;
+
+  // Exclusive
+  optional string stopKey = 2;
+}
+
+// Specification of an individual data range which is held in its entirety by 
a storage
+// server. Corresponds to TScanRange and should be kept in sync with it.
+message ScanRangePB {
+  // One of these must be set for every ScanRangePB.
+  optional HdfsFileSplitPB hdfs_file_split = 1;
+  optional HBaseKeyRangePB hbase_key_range = 2;
+  optional string kudu_scan_token = 3;
+}
diff --git a/common/protobuf/row_batch.proto b/common/protobuf/row_batch.proto
index e35e5fa..1bc9afc 100644
--- a/common/protobuf/row_batch.proto
+++ b/common/protobuf/row_batch.proto
@@ -37,5 +37,5 @@ message RowBatchHeaderPB {
   optional int64 uncompressed_size = 3;
 
   // The compression codec (if any) used for compressing the row batch.
-  optional CompressionType compression_type = 4;
+  optional CompressionTypePB compression_type = 4;
 }
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index ff35d14..4728c91 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -590,54 +590,16 @@ struct TQueryCtx {
   25: optional i64 transaction_id
 }
 
-// Specification of one output destination of a plan fragment
-struct TPlanFragmentDestination {
-  // the globally unique fragment instance id
-  1: required Types.TUniqueId fragment_instance_id
-
-  // hostname + port of the Thrift based ImpalaInteralService on the 
destination
-  2: required Types.TNetworkAddress thrift_backend
-
-  // IP address + port of the KRPC based ImpalaInternalService on the 
destination
-  3: optional Types.TNetworkAddress krpc_backend
-}
-
-// Context to collect information, which is shared among all instances of that 
plan
-// fragment.
-struct TPlanFragmentCtx {
-  1: required Planner.TPlanFragment fragment
-
-  // Output destinations, one per output partition.
-  // The partitioning of the output is specified by
-  // TPlanFragment.output_sink.output_partition.
-  // The number of output partitions is destinations.size().
-  2: list<TPlanFragmentDestination> destinations
-}
-
-// A scan range plus the parameters needed to execute that scan.
-struct TScanRangeParams {
-  1: required PlanNodes.TScanRange scan_range
-  2: optional i32 volume_id = -1
-  3: optional bool try_hdfs_cache = false
-  4: optional bool is_remote
-}
-
 // Descriptor that indicates that a runtime filter is produced by a plan node.
 struct TRuntimeFilterSource {
   1: required Types.TPlanNodeId src_node_id
   2: required i32 filter_id
 }
 
-// Information about the input fragment instance of a join node.
-struct TJoinBuildInput {
-  // The join node id that will consume this join build.
-  1: required Types.TPlanNodeId join_node_id
-
-  // Fragment instance id of the input fragment instance.
-  2: required Types.TUniqueId input_finstance_id
-}
-
-// Execution parameters of a single fragment instance.
+// The Thrift portion of the execution parameters of a single fragment 
instance. Every
+// fragment instance will also have a corresponding PlanFragmentInstanceCtxPB 
with the
+// same fragment_idx.
+// TODO: convert the rest of this struct to protobuf
 struct TPlanFragmentInstanceCtx {
   // TPlanFragment.idx
   1: required Types.TFragmentIdx fragment_idx
@@ -656,12 +618,9 @@ struct TPlanFragmentInstanceCtx {
   // Range: [0, <# of instances of parent fragment> - 1]
   3: required i32 per_fragment_instance_idx
 
-  // Initial scan ranges for each scan node in TPlanFragment.plan_tree
-  4: required map<Types.TPlanNodeId, list<TScanRangeParams>> 
per_node_scan_ranges
-
   // Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree;
   // needed to create a DataStreamRecvr
-  // TODO for per-query exec rpc: move these to TPlanFragmentCtx
+  // TODO for per-query exec rpc: move these to PlanFragmentCtxPB
   5: required map<Types.TPlanNodeId, i32> per_exch_num_senders
 
   // Id of this instance in its role as a sender.
@@ -672,9 +631,6 @@ struct TPlanFragmentInstanceCtx {
   // List of runtime filters produced by nodes in the finstance.
   8: optional list<TRuntimeFilterSource> filters_produced
 
-  // List of input join build finstances for joins in this finstance.
-  9: optional list<TJoinBuildInput> join_build_inputs
-
   // If this is a join build fragment, the number of fragment instances that 
consume the
   // join build. -1 = invalid.
   10: optional i32 num_join_build_outputs
@@ -694,9 +650,9 @@ enum ImpalaInternalServiceVersion {
 // serialize it ourselves and send it with ExecQueryFInstances as a sidecar.
 // TODO: investigate if it's worth converting this fully to protobuf
 struct TExecPlanFragmentInfo {
-  1: optional list<TPlanFragmentCtx> fragment_ctxs
+  1: optional list<Planner.TPlanFragment> fragments
 
-  // the order corresponds to the order of fragments in fragment_ctxs
+  // the order corresponds to the order of fragments in 'fragments'
   2: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
 }
 
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 8b6687d..e0a765c 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -170,7 +170,8 @@ struct TRuntimeFilterDesc {
 // - T<subclass>: all other operational parameters that are the same across
 //   all plan fragments
 
-// Specification of subsection of a single hdfs file.
+// Specification of a subsection of a single HDFS file. Corresponds to 
HdfsFileSpiltPB and
+// should be kept in sync with it.
 struct THdfsFileSplit {
   // File name (not the full path).  The path is assumed to be relative to the
   // 'location' of the THdfsPartition referenced by partition_id.
@@ -204,7 +205,8 @@ struct THdfsFileSplit {
   9: required i32 partition_path_hash
 }
 
-// key range for single THBaseScanNode
+// Key range for single THBaseScanNode. Corresponds to HBaseKeyRangePB and 
should be kept
+// in sync with it.
 // TODO: does 'binary' have an advantage over string? strings can
 // already store binary data
 struct THBaseKeyRange {
@@ -240,7 +242,7 @@ struct TFileSplitGeneratorSpec {
 }
 
 // Specification of an individual data range which is held in its entirety
-// by a storage server.
+// by a storage server. Corresponds to ScanRangePB and should be kept in sync 
with it.
 struct TScanRange {
   // one of these must be set for every TScanRange
   1: optional THdfsFileSplit hdfs_file_split

Reply via email to