IMPALA-7436: initial fetch-from-catalogd implementation

This patch adds a new RPC to the catalogd which allows a client to fetch
a partial view of table or database metadata. Various subsets of
information can be specified and are sent back in fairly "raw" format.

A new MetaProvider implementation is added which uses this API to
support granular fetching of metadata into the impalad. The interface
had to be reworked in a few ways to support this:

- This API uses partition IDs instead of names to specify them. So, the
  listPartitions API now returns opaque PartitionRefs which are passed
  back to the MetaProvider when loading more partition details. The new
  implementation stores the IDs in these refs while the direct-to-HMS
  implementation just uses names.

- The fetching of file descriptors was merged into the loading of other
  partition metadata. I couldn't think of any cases where we needed to
  list partition details without also fetching the file descriptors so
  it simplified things a bit to merge the two. This was a lot easier to
  implement for CatalogdMetaProvider since the file metadata is stored
  by partition rather than looked up by a directory as in the previous
  API.

  This necessitated moving some of the logic out of LocalFsTable into
  DirectMetaProvider, so LocalFsTable no longer deals directly with HDFS
  APIs like FileStatus.

- The handling of "default partition" for an unpartitioned table moved
  into the MetaProvider implementations itself instead of LocalFsTable.
  This is because the CatalogdProvider sees the "default partition" as a
  partition that actually has an identifier on the catalogd, whereas the
  DirectMetaProvider does not. So, now both providers export the
  "default partition" as a partition like all the others.

This patch also starts to address one of the potential semantic risks of
partial caching on the impalad. If one query fetches some subset of
partitions, then a DDL occurs to change the table metadata, and another
query is submitted, we want to ensure that the metadata for the latter
query still reads a consistent snapshot. In other words, we need to
ensure that the metadata like partition list and table schema come from
the same snapshot as the finer-grained metadata like partition contents.

In order to implement this, the MetadataProvider API now requires that
callers use a 'TableRef' object to specify the table to be read, instead
of the dbName/tableName. In the DirectMetaProvider we don't have any
convenient version numbers for a table, so the TableRef just
encapsulates the naming. In the CatalogdMetaProvider, we additionally
store the version number of the table, and then all subsequent requests
verify that the version number has not changed. If it detects a
concurrent modification, an exception is thrown. In a future patch,
I'm planning on having the frontend catch the exception and trigger a
"re-plan".

Change-Id: If49207fc592b1cc552fbcc7199568b6833f86901
Reviewed-on: http://gerrit.cloudera.org:8080/11182
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Vuk Ercegovac <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ef15da08
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ef15da08
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ef15da08

Branch: refs/heads/master
Commit: ef15da08aa124126805201ba0c7199e22c0dcdb7
Parents: 8360277
Author: Todd Lipcon <[email protected]>
Authored: Thu Aug 2 01:20:23 2018 -0700
Committer: Todd Lipcon <[email protected]>
Committed: Tue Aug 21 16:17:03 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                |  18 +
 be/src/catalog/catalog-service-client-wrapper.h |   8 +
 be/src/catalog/catalog.cc                       |   6 +
 be/src/catalog/catalog.h                        |   7 +
 be/src/exec/catalog-op-executor.cc              |  19 +-
 be/src/exec/catalog-op-executor.h               |   7 +
 be/src/service/fe-support.cc                    |  27 ++
 common/fbs/CMakeLists.txt                       |   2 +-
 common/thrift/CatalogService.thrift             | 128 ++++++
 .../impala/catalog/CatalogServiceCatalog.java   |  70 ++++
 .../org/apache/impala/catalog/ColumnStats.java  |  63 +++
 .../main/java/org/apache/impala/catalog/Db.java |  28 ++
 .../apache/impala/catalog/HdfsPartition.java    |  38 +-
 .../org/apache/impala/catalog/HdfsTable.java    |  56 +++
 .../apache/impala/catalog/IncompleteTable.java  |  10 +
 .../java/org/apache/impala/catalog/Table.java   |  48 +++
 .../catalog/local/CatalogdMetaProvider.java     | 385 +++++++++++++++++++
 .../catalog/local/DirectMetaProvider.java       | 243 ++++++++++--
 .../InconsistentMetadataFetchException.java     |  37 ++
 .../impala/catalog/local/LocalCatalog.java      |   2 +-
 .../impala/catalog/local/LocalFsPartition.java  |  79 +---
 .../impala/catalog/local/LocalFsTable.java      |  98 ++---
 .../impala/catalog/local/LocalHbaseTable.java   |  12 +-
 .../impala/catalog/local/LocalKuduTable.java    |  13 +-
 .../catalog/local/LocalPartitionSpec.java       |  64 ++-
 .../apache/impala/catalog/local/LocalTable.java |  40 +-
 .../apache/impala/catalog/local/LocalView.java  |   5 +-
 .../impala/catalog/local/MetaProvider.java      |  47 ++-
 .../org/apache/impala/common/RuntimeEnv.java    |   1 +
 .../org/apache/impala/planner/HdfsScanNode.java |   1 -
 .../impala/service/CatalogOpExecutor.java       |  58 +--
 .../org/apache/impala/service/FeSupport.java    |  14 +
 .../org/apache/impala/service/Frontend.java     |   4 +
 .../org/apache/impala/service/JniCatalog.java   |  10 +
 .../impala/catalog/HdfsPartitionTest.java       |  69 +++-
 .../impala/catalog/PartialCatalogInfoTest.java  | 183 +++++++++
 .../impala/catalog/local/LocalCatalogTest.java  |  18 +
 .../apache/impala/common/FrontendTestBase.java  |   2 +-
 38 files changed, 1640 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index a17478f..96646fb 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -124,6 +124,24 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
     VLOG_RPC << "GetCatalogObject(): response=" << ThriftDebugString(resp);
   }
 
+  virtual void GetPartialCatalogObject(TGetPartialCatalogObjectResponse& resp,
+      const TGetPartialCatalogObjectRequest& req) {
+    // TODO(todd): capture detailed metrics on the types of inbound requests, 
lock
+    // wait times, etc.
+    // TODO(todd): add some kind of limit on the number of concurrent requests 
here
+    // to avoid thread exhaustion -- eg perhaps it would be best to use a 
trylock
+    // on the catalog locks, or defer these calls to a separate (bounded) 
queue,
+    // so a heavy query workload against a table undergoing a slow refresh 
doesn't
+    // end up taking down the catalog by creating thousands of threads.
+    VLOG_RPC << "GetPartialCatalogObject(): request=" << 
ThriftDebugString(req);
+    Status status = catalog_server_->catalog()->GetPartialCatalogObject(req, 
&resp);
+    if (!status.ok()) LOG(ERROR) << status.GetDetail();
+    TStatus thrift_status;
+    status.ToThrift(&thrift_status);
+    resp.__set_status(thrift_status);
+    VLOG_RPC << "GetPartialCatalogObject(): response=" << 
ThriftDebugString(resp);
+  }
+
   // Prioritizes the loading of metadata for one or more catalog objects. 
Currently only
   // used for loading tables/views because they are the only type of object 
that is loaded
   // lazily.

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/catalog/catalog-service-client-wrapper.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-service-client-wrapper.h 
b/be/src/catalog/catalog-service-client-wrapper.h
index 22ac56d..46ecdf2 100644
--- a/be/src/catalog/catalog-service-client-wrapper.h
+++ b/be/src/catalog/catalog-service-client-wrapper.h
@@ -55,6 +55,14 @@ class CatalogServiceClientWrapper : public 
CatalogServiceClient {
     recv_GetCatalogObject(_return);
   }
 
+  void GetPartialCatalogObject(TGetPartialCatalogObjectResponse& _return,
+      const TGetPartialCatalogObjectRequest& req, bool* send_done) {
+    DCHECK(!*send_done);
+    send_GetPartialCatalogObject(req);
+    *send_done = true;
+    recv_GetPartialCatalogObject(_return);
+  }
+
   void ResetMetadata(TResetMetadataResponse& _return, const 
TResetMetadataRequest& req,
       bool* send_done) {
     DCHECK(!*send_done);

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index dcc1657..cb445c3 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -63,6 +63,7 @@ Catalog::Catalog() {
     {"getFunctions", "([B)[B", &get_functions_id_},
     {"checkUserSentryAdmin", "([B)V", &sentry_admin_check_id_},
     {"getCatalogObject", "([B)[B", &get_catalog_object_id_},
+    {"getPartialCatalogObject", "([B)[B", &get_partial_catalog_object_id_},
     {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
     {"getCatalogUsage", "()[B", &get_catalog_usage_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
@@ -91,6 +92,11 @@ Status Catalog::GetCatalogObject(const TCatalogObject& req,
   return JniUtil::CallJniMethod(catalog_, get_catalog_object_id_, req, resp);
 }
 
+Status Catalog::GetPartialCatalogObject(const TGetPartialCatalogObjectRequest& 
req,
+    TGetPartialCatalogObjectResponse* resp) {
+  return JniUtil::CallJniMethod(catalog_, get_partial_catalog_object_id_, req, 
resp);
+}
+
 Status Catalog::GetCatalogVersion(long* version) {
   JNIEnv* jni_env = getJNIEnv();
   JniLocalFrame jni_frame;

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 872ceca..6f7b051 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -72,6 +72,12 @@ class Catalog {
   /// information on the error will be returned.
   Status GetCatalogObject(const TCatalogObject& request, TCatalogObject* 
response);
 
+  /// Return partial information about a Catalog object.
+  /// Returns OK if the operation was successful, otherwise a Status object 
with
+  /// information on the error will be returned.
+  Status GetPartialCatalogObject(const TGetPartialCatalogObjectRequest& 
request,
+      TGetPartialCatalogObjectResponse* response);
+
   /// Return all databases matching the optional argument 'pattern'.
   /// If pattern is NULL, match all databases otherwise match only those 
databases that
   /// match the pattern string. Patterns are "p1|p2|p3" where | denotes choice,
@@ -121,6 +127,7 @@ class Catalog {
   jmethodID exec_ddl_id_;  // JniCatalog.execDdl()
   jmethodID reset_metadata_id_;  // JniCatalog.resetMetdata()
   jmethodID get_catalog_object_id_;  // JniCatalog.getCatalogObject()
+  jmethodID get_partial_catalog_object_id_;  // 
JniCatalog.getPartialCatalogObject()
   jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
   jmethodID get_catalog_usage_id_; // JniCatalog.getCatalogUsage()

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc 
b/be/src/exec/catalog-op-executor.cc
index 164187c..d587205 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -28,8 +28,9 @@
 #include "service/frontend.h"
 #include "service/impala-server.h"
 #include "service/hs2-util.h"
-#include "util/string-parser.h"
 #include "util/runtime-profile-counters.h"
+#include "util/string-parser.h"
+#include "util/test-info.h"
 #include "gen-cpp/CatalogService.h"
 #include "gen-cpp/CatalogService_types.h"
 #include "gen-cpp/CatalogObjects_types.h"
@@ -43,6 +44,7 @@ using namespace impala;
 using namespace apache::hive::service::cli::thrift;
 using namespace apache::thrift;
 
+DECLARE_bool(use_local_catalog);
 DECLARE_int32(catalog_service_port);
 DECLARE_string(catalog_service_host);
 
@@ -283,6 +285,21 @@ Status CatalogOpExecutor::GetCatalogObject(const 
TCatalogObject& object_desc,
   return Status::OK();
 }
 
+Status CatalogOpExecutor::GetPartialCatalogObject(
+    const TGetPartialCatalogObjectRequest& req,
+    TGetPartialCatalogObjectResponse* resp) {
+  DCHECK(FLAGS_use_local_catalog || TestInfo::is_test());
+  const TNetworkAddress& address =
+      MakeNetworkAddress(FLAGS_catalog_service_host, 
FLAGS_catalog_service_port);
+  Status status;
+  CatalogServiceConnection client(env_->catalogd_client_cache(), address, 
&status);
+  RETURN_IF_ERROR(status);
+  RETURN_IF_ERROR(
+      client.DoRpc(&CatalogServiceClientWrapper::GetPartialCatalogObject, req, 
resp));
+  return Status::OK();
+}
+
+
 Status CatalogOpExecutor::PrioritizeLoad(const TPrioritizeLoadRequest& req,
     TPrioritizeLoadResponse* result) {
   const TNetworkAddress& address =

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/exec/catalog-op-executor.h
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.h 
b/be/src/exec/catalog-op-executor.h
index 375c839..c58a99e 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -30,6 +30,9 @@ class Frontend;
 class Status;
 class RuntimeProfile;
 
+class TGetPartialCatalogObjectRequest;
+class TGetPartialCatalogObjectResponse;
+
 /// The CatalogOpExecutor is responsible for executing catalog operations.
 /// This includes DDL statements such as CREATE and ALTER as well as 
statements such
 /// as INVALIDATE METADATA. One CatalogOpExecutor is typically created per 
catalog
@@ -47,6 +50,10 @@ class CatalogOpExecutor {
   /// be loaded.
   Status GetCatalogObject(const TCatalogObject& object_desc, TCatalogObject* 
result);
 
+  /// Fetch partial information about a specific TCatalogObject from the 
catalog server.
+  Status GetPartialCatalogObject(const TGetPartialCatalogObjectRequest& req,
+      TGetPartialCatalogObjectResponse* resp);
+
   /// Translates the given compute stats request and its child-query results 
into
   /// a new table alteration request for updating the stats metadata, and 
executes
   /// the alteration via Exec();

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 187d14e..eb64ce5 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -523,6 +523,28 @@ 
Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad(
   return result_bytes;
 }
 
+// Calls in to the catalog server to request partial information about a
+// catalog object.
+extern "C"
+JNIEXPORT jbyteArray JNICALL
+Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject(
+    JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
+  TGetPartialCatalogObjectRequest request;
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env,
+      JniUtil::internal_exc_class(), nullptr);
+
+  CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, 
nullptr);
+  TGetPartialCatalogObjectResponse result;
+  Status status = catalog_op_executor.GetPartialCatalogObject(request, 
&result);
+  THROW_IF_ERROR_RET(status, env, JniUtil::internal_exc_class(), nullptr);
+
+  jbyteArray result_bytes = nullptr;
+  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
+      JniUtil::internal_exc_class(), result_bytes);
+  return result_bytes;
+}
+
+
 // Used to call native code from the FE to parse and set comma-delimited 
key=value query
 // options.
 extern "C"
@@ -581,6 +603,11 @@ static JNINativeMethod native_methods[] = {
       (void*)::Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad
   },
   {
+      const_cast<char*>("NativeGetPartialCatalogObject"),
+      const_cast<char*>("([B)[B"),
+      
(void*)::Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject
+  },
+  {
       const_cast<char*>("NativeParseQueryOptions"),
       const_cast<char*>("(Ljava/lang/String;[B)[B"),
       (void*)::Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/common/fbs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/fbs/CMakeLists.txt b/common/fbs/CMakeLists.txt
index d0c4ef7..a55fa08 100644
--- a/common/fbs/CMakeLists.txt
+++ b/common/fbs/CMakeLists.txt
@@ -60,7 +60,7 @@ set(BE_OUTPUT_DIR 
${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp)
 set(FE_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/fe/generated-sources/gen-java)
 file(MAKE_DIRECTORY ${FE_OUTPUT_DIR})
 file(MAKE_DIRECTORY ${BE_OUTPUT_DIR})
-set(JAVA_FE_ARGS --java -o ${FE_OUTPUT_DIR} -b)
+set(JAVA_FE_ARGS --gen-mutable --java -o ${FE_OUTPUT_DIR} -b)
 message(${JAVA_FE_ARGS})
 set(CPP_ARGS --cpp -o ${BE_OUTPUT_DIR} -b)
 message(${CPP_ARGS})

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/common/thrift/CatalogService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogService.thrift 
b/common/thrift/CatalogService.thrift
index aaabd31..98c9b05 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -23,6 +23,7 @@ include "JniCatalog.thrift"
 include "Types.thrift"
 include "Status.thrift"
 include "Results.thrift"
+include "hive_metastore.thrift"
 
 // CatalogServer service API and related structs.
 
@@ -234,6 +235,129 @@ struct TGetFunctionsResponse {
   2: optional list<Types.TFunction> functions;
 }
 
+// Selector for partial information about Catalog-scoped objects
+// (i.e. those that are not within a particular database or table).
+struct TCatalogInfoSelector {
+  1: bool want_db_names
+  // TODO(todd): add objects like DataSources, etc.
+}
+
+// Returned info from a catalog request which selected items in
+// TCatalogInfoSelector.
+struct TPartialCatalogInfo {
+  1: list<string> db_names
+}
+
+// Selector for partial information about a Table.
+struct TTableInfoSelector {
+  // The response should include the HMS table struct.
+  1: bool want_hms_table
+
+  // If set, the response should include information about the given list of
+  // partitions. If this is unset, information about all partitions will be
+  // returned, so long as at least one of the following 'want_partition_*'
+  // flags is specified.
+  //
+  // If a partition ID is passed, but that partition does not exist in the
+  // table, then an exception will be thrown. It is assumed that the partition
+  // IDs passed here are a result of a prior successful call to fetch the 
partition
+  // list of this table.
+  //
+  // NOTE: "unset" and "set to empty" are different -- "set to empty" causes
+  // no partitions to be returned, whereas "unset" causes all partitions to be
+  // returned, so long as one of the following 'want_partition_*' is set.
+  2: optional list<i64> partition_ids
+
+  // ... each such partition should include its name.
+  3: bool want_partition_names
+
+  // ... each such partition should include metadata (location, etc).
+  4: bool want_partition_metadata
+
+  // ... each such partition should include its file info
+  5: bool want_partition_files
+
+  // List of columns to fetch stats for.
+  6: optional list<string> want_stats_for_column_names
+}
+
+// Returned information about a particular partition.
+struct TPartialPartitionInfo {
+  1: required i64 id
+
+  // Set if 'want_partition_names' was set in TTableInfoSelector.
+  2: optional string name
+
+  // Set if 'want_partition_metadata' was set in TTableInfoSelector.
+  3: optional hive_metastore.Partition hms_partition
+
+  // Set if 'want_partition_files' was set in TTableInfoSelector.
+  4: optional list<CatalogObjects.THdfsFileDesc> file_descriptors
+}
+
+// Returned information about a Table, as selected by TTableInfoSelector.
+struct TPartialTableInfo {
+  1: optional hive_metastore.Table hms_table
+
+  // The partition metadata for the requested partitions.
+  //
+  // If explicit partitions were passed, then it is guaranteed that this list
+  // is the same size and the same order as the requested list of IDs.
+  //
+  // See TPartialPartitionInfo for details on which fields will be set based
+  // on the caller-provided selector.
+  2: optional list<TPartialPartitionInfo> partitions
+
+  3: optional list<hive_metastore.ColumnStatisticsObj> column_stats
+
+  // Each TNetworkAddress is a datanode which contains blocks of a file in the 
table.
+  // Used so that each THdfsFileBlock can just reference an index in this list 
rather
+  // than duplicate the list of network address, which helps reduce memory 
usage.
+  // Only used when partition files are fetched.
+  7: optional list<Types.TNetworkAddress> network_addresses
+}
+
+// Selector for partial information about a Database.
+struct TDbInfoSelector {
+  // The response should include the HMS Database object.
+  1: bool want_hms_database
+  // The response should include the list of table names in the DB.
+  2: bool want_table_names
+  // TODO(todd): function names
+}
+
+// Returned information about a Database, as selected by TDbInfoSelector.
+struct TPartialDbInfo {
+  1: optional hive_metastore.Database hms_database
+  2: optional list<string> table_names
+}
+
+// RPC request for GetPartialCatalogObject.
+struct TGetPartialCatalogObjectRequest {
+  1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V1
+
+  // A catalog object descriptor: a TCatalogObject with the object name and 
type fields
+  // set.
+  2: required CatalogObjects.TCatalogObject object_desc
+
+  3: optional TTableInfoSelector table_info_selector
+  4: optional TDbInfoSelector db_info_selector
+  5: optional TCatalogInfoSelector catalog_info_selector
+}
+
+// RPC response for GetPartialCatalogObject.
+struct TGetPartialCatalogObjectResponse {
+  // The status of the operation, OK if the operation was successful.
+  // Unset indicates "OK".
+  1: optional Status.TStatus status
+
+  2: optional i64 object_version_number
+  3: optional TPartialTableInfo table_info
+  4: optional TPartialDbInfo db_info
+  5: optional TPartialCatalogInfo catalog_info
+}
+
+
 // Request the complete metadata for a given catalog object. May trigger a 
metadata load
 // if the object is not already in the catalog cache.
 struct TGetCatalogObjectRequest {
@@ -316,4 +440,8 @@ service CatalogService {
   // TODO: When Sentry Service has a better mechanism to perform these changes 
this API
   // should be deprecated.
   TSentryAdminCheckResponse SentryAdminCheck(1: TSentryAdminCheckRequest req);
+
+  // Fetch partial information about some object in the catalog.
+  TGetPartialCatalogObjectResponse GetPartialCatalogObject(
+      1: TGetPartialCatalogObjectRequest req);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 1fc43f5..68fa274 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -45,10 +45,15 @@ import org.apache.impala.common.Reference;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TCatalog;
+import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCatalogUpdateResult;
+import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TGetCatalogUsageResponse;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.TPartialCatalogInfo;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrincipalType;
 import org.apache.impala.thrift.TPrivilege;
@@ -1917,4 +1922,69 @@ public class CatalogServiceCatalog extends Catalog {
       tbl.getLock().unlock();
     }
   }
+
+  /**
+   * Return a partial view of information about a given catalog object. This 
services
+   * the CatalogdMetaProvider running on impalads when they are configured in
+   * "local-catalog" mode.
+   */
+  public TGetPartialCatalogObjectResponse getPartialCatalogObject(
+      TGetPartialCatalogObjectRequest req) throws CatalogException {
+    TCatalogObject objectDesc = Preconditions.checkNotNull(req.object_desc,
+        "missing object_desc");
+    switch (objectDesc.type) {
+    case CATALOG:
+      return getPartialCatalogInfo(req);
+    case DATABASE:
+      TDatabase dbDesc = Preconditions.checkNotNull(req.object_desc.db);
+      versionLock_.readLock().lock();
+      try {
+        Db db = getDb(dbDesc.getDb_name());
+        if (db == null) {
+          throw new CatalogException(
+              "Database not found: " + req.object_desc.getDb().getDb_name());
+        }
+
+        return db.getPartialInfo(req);
+      } finally {
+        versionLock_.readLock().unlock();
+      }
+    case TABLE:
+    case VIEW: {
+      Table table = getOrLoadTable(objectDesc.getTable().getDb_name(),
+          objectDesc.getTable().getTbl_name());
+      if (table == null) {
+        throw new CatalogException("Table not found: " +
+            objectDesc.getTable().getTbl_name());
+      }
+      // TODO(todd): consider a read-write lock here.
+      table.getLock().lock();
+      try {
+        return table.getPartialInfo(req);
+      } finally {
+        table.getLock().unlock();
+      }
+    }
+    default:
+      throw new CatalogException("Unable to fetch partial info for type: " +
+          req.object_desc.type);
+    }
+  }
+
+  /**
+   * Return a partial view of information about global parts of the catalog (eg
+   * the list of tables, etc).
+   */
+  private TGetPartialCatalogObjectResponse getPartialCatalogInfo(
+      TGetPartialCatalogObjectRequest req) {
+    TGetPartialCatalogObjectResponse resp = new 
TGetPartialCatalogObjectResponse();
+    resp.catalog_info = new TPartialCatalogInfo();
+    TCatalogInfoSelector sel = 
Preconditions.checkNotNull(req.catalog_info_selector,
+        "no catalog_info_selector in request");
+    if (sel.want_db_names) {
+      resp.catalog_info.db_names = 
ImmutableList.copyOf(dbCache_.get().keySet());
+    }
+    // TODO(todd) implement data sources and other global information.
+    return resp;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java 
b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
index c798d96..d05bfa4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
@@ -33,6 +33,7 @@ import org.apache.impala.thrift.TColumnStats;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+import com.google.common.math.LongMath;
 
 /**
  * Statistics for a single column.
@@ -249,6 +250,68 @@ public class ColumnStats {
   }
 
   /**
+   * Convert the statistics back into an HMS-compatible ColumnStatisticsData 
object.
+   * This is essentially the inverse of {@link #update(Type, 
ColumnStatisticsData)
+   * above.
+   *
+   * Returns null if statistics for the specified type are not supported.
+   */
+  public static ColumnStatisticsData createHiveColStatsData(
+      long capNdv, TColumnStats colStats, Type colType) {
+    ColumnStatisticsData colStatsData = new ColumnStatisticsData();
+    long ndv = colStats.getNum_distinct_values();
+    // Cap NDV at row count if available.
+    if (capNdv >= 0) ndv = Math.min(ndv, capNdv);
+
+    long numNulls = colStats.getNum_nulls();
+    switch(colType.getPrimitiveType()) {
+      case BOOLEAN:
+        colStatsData.setBooleanStats(new BooleanColumnStatsData(1, -1, 
numNulls));
+        break;
+      case TINYINT:
+        ndv = Math.min(ndv, LongMath.pow(2, Byte.SIZE));
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
+      case SMALLINT:
+        ndv = Math.min(ndv, LongMath.pow(2, Short.SIZE));
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
+      case INT:
+        ndv = Math.min(ndv, LongMath.pow(2, Integer.SIZE));
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
+      case BIGINT:
+      case TIMESTAMP: // Hive and Impala use LongColumnStatsData for 
timestamps.
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
+      case FLOAT:
+      case DOUBLE:
+        colStatsData.setDoubleStats(new DoubleColumnStatsData(numNulls, ndv));
+        break;
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        long maxStrLen = colStats.getMax_size();
+        double avgStrLen = colStats.getAvg_size();
+        colStatsData.setStringStats(
+            new StringColumnStatsData(maxStrLen, avgStrLen, numNulls, ndv));
+        break;
+      case DECIMAL:
+        double decMaxNdv = Math.pow(10, colType.getPrecision());
+        ndv = (long) Math.min(ndv, decMaxNdv);
+        colStatsData.setDecimalStats(new DecimalColumnStatsData(numNulls, 
ndv));
+        break;
+      default:
+        return null;
+    }
+    return colStatsData;
+  }
+
+  public ColumnStatisticsData toHmsCompatibleThrift(Type colType) {
+    return createHiveColStatsData(-1, toThrift(), colType);
+  }
+
+  /**
    * Sets the member corresponding to the given stats key to 'value'.
    * Requires that the given value is of a type appropriate for the
    * member being set. Throws if that is not the case.

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java 
b/fe/src/main/java/org/apache/impala/catalog/Db.java
index cb98c85..7e15210 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -35,8 +35,12 @@ import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDatabase;
+import org.apache.impala.thrift.TDbInfoSelector;
 import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TFunctionCategory;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.TPartialDbInfo;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.PatternMatcher;
 
@@ -435,4 +439,28 @@ public class Db extends CatalogObjectImpl implements FeDb {
     catalogObj.setDb(toThrift());
     return catalogObj;
   }
+
+  /**
+   * Get partial information about this DB in order to service 
CatalogdMetaProvider
+   * running in a remote impalad.
+   */
+  public TGetPartialCatalogObjectResponse getPartialInfo(
+      TGetPartialCatalogObjectRequest req) {
+    TDbInfoSelector selector = Preconditions.checkNotNull(req.db_info_selector,
+        "no db_info_selector");
+
+    TGetPartialCatalogObjectResponse resp = new 
TGetPartialCatalogObjectResponse();
+    resp.setObject_version_number(getCatalogVersion());
+    resp.db_info = new TPartialDbInfo();
+    if (selector.want_hms_database) {
+      // TODO(todd): we need to deep-copy here because 'addFunction' other DDLs
+      // modify the parameter map in place. We need to change those to 
copy-on-write
+      // instead to avoid this copy.
+      resp.db_info.hms_database = getMetaStoreDb().deepCopy();
+    }
+    if (selector.want_table_names) {
+      resp.db_info.table_names = getAllTableNames();
+    }
+    return resp;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 14ce4e3..26ece19 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -99,6 +99,34 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
     }
 
     /**
+     * Clone the descriptor, but change the replica indexes to reference the 
new host
+     * index 'dstIndex' instead of the original index 'origIndex'.
+     */
+    public FileDescriptor cloneWithNewHostIndex(List<TNetworkAddress> 
origIndex,
+        ListMap<TNetworkAddress> dstIndex) {
+      // First clone the flatbuffer with no changes.
+      ByteBuffer oldBuf = fbFileDescriptor_.getByteBuffer();
+      ByteBuffer newBuf = ByteBuffer.allocate(oldBuf.remaining());
+      newBuf.put(oldBuf.array(), oldBuf.position(), oldBuf.remaining());
+      newBuf.rewind();
+      FbFileDesc cloned = FbFileDesc.getRootAsFbFileDesc(newBuf);
+
+      // Now iterate over the blocks in the new flatbuffer and mutate the 
indexes.
+      FbFileBlock it = new FbFileBlock();
+      for (int i = 0; i < cloned.fileBlocksLength(); i++) {
+        it = cloned.fileBlocks(it, i);
+        for (int j = 0; j < it.replicaHostIdxsLength(); j++) {
+          int origHostIdx = FileBlock.getReplicaHostIdx(it, j);
+          boolean isCached = FileBlock.isReplicaCached(it, j);
+          TNetworkAddress origHost = origIndex.get(origHostIdx);
+          int newHostIdx = dstIndex.getIndex(origHost);
+          it.mutateReplicaHostIdxs(j, FileBlock.makeReplicaIdx(isCached, 
newHostIdx));
+        }
+      }
+      return new FileDescriptor(cloned);
+    }
+
+    /**
      * Creates the file descriptor of a file represented by 'fileStatus' with 
blocks
      * stored in 'blockLocations'. 'fileSystem' is the filesystem where the
      * file resides and 'hostIndex' stores the network addresses of the hosts 
that store
@@ -316,8 +344,7 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
         TNetworkAddress networkAddress = 
BlockReplica.parseLocation(loc.getNames()[i]);
         short replicaIdx = (short) hostIndex.getIndex(networkAddress);
         boolean isReplicaCached = cachedHosts.contains(loc.getHosts()[i]);
-        replicaIdx = isReplicaCached ?
-            (short) (replicaIdx | ~REPLICA_HOST_IDX_MASK) : replicaIdx;
+        replicaIdx = makeReplicaIdx(isReplicaCached, replicaIdx);
         fbb.addShort(replicaIdx);
       }
       int fbReplicaHostIdxOffset = fbb.endVector();
@@ -334,6 +361,13 @@ public class HdfsPartition implements FeFsPartition, 
PrunablePartition {
       return FbFileBlock.endFbFileBlock(fbb);
     }
 
+    private static short makeReplicaIdx(boolean isReplicaCached, int hostIdx) {
+      Preconditions.checkArgument((hostIdx & REPLICA_HOST_IDX_MASK) == hostIdx,
+          "invalid hostIdx: %s", hostIdx);
+      return isReplicaCached ? (short) (hostIdx | ~REPLICA_HOST_IDX_MASK)
+          : (short)hostIdx;
+    }
+
     /**
      * Constructs an FbFileBlock object from the file block metadata that 
comprise block's
      * 'offset', 'length' and replica index 'replicaIdx'. Serializes the file 
block

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 74f84e5..d4423d9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -67,9 +67,12 @@ import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
@@ -1682,6 +1685,59 @@ public class HdfsTable extends Table implements 
FeFsTable {
     return table;
   }
 
+  @Override
+  public TGetPartialCatalogObjectResponse getPartialInfo(
+      TGetPartialCatalogObjectRequest req) throws TableLoadingException {
+    TGetPartialCatalogObjectResponse resp = super.getPartialInfo(req);
+
+    boolean wantPartitionInfo = req.table_info_selector.want_partition_files ||
+        req.table_info_selector.want_partition_metadata ||
+        req.table_info_selector.want_partition_names;
+
+    Collection<Long> partIds = req.table_info_selector.partition_ids;
+    if (partIds == null && wantPartitionInfo) {
+      // Caller specified at least one piece of partition info but didn't 
specify
+      // any partition IDs. That means they want the info for all partitions.
+      partIds = partitionMap_.keySet();
+    }
+
+    if (partIds != null) {
+      resp.table_info.partitions = 
Lists.newArrayListWithCapacity(partIds.size());
+      for (long partId : partIds) {
+        HdfsPartition part = partitionMap_.get(partId);
+        Preconditions.checkArgument(part != null, "Partition id %s does not 
exist",
+            partId);
+        TPartialPartitionInfo partInfo = new TPartialPartitionInfo(partId);
+
+        if (req.table_info_selector.want_partition_names) {
+          partInfo.setName(part.getPartitionName());
+        }
+
+        if (req.table_info_selector.want_partition_metadata) {
+          partInfo.hms_partition = part.toHmsPartition();
+        }
+
+        if (req.table_info_selector.want_partition_files) {
+          List<FileDescriptor> fds = part.getFileDescriptors();
+          partInfo.file_descriptors = 
Lists.newArrayListWithCapacity(fds.size());
+          for (FileDescriptor fd: fds) {
+            partInfo.file_descriptors.add(fd.toThrift());
+          }
+        }
+
+        resp.table_info.partitions.add(partInfo);
+      }
+    }
+
+    if (req.table_info_selector.want_partition_files) {
+      // TODO(todd) we are sending the whole host index even if we returned 
only
+      // one file -- maybe not so efficient, but the alternative is to do a 
bunch
+      // of cloning of file descriptors which might increase memory pressure.
+      resp.table_info.setNetwork_addresses(hostIndex_.getList());
+    }
+    return resp;
+  }
+
   /**
    * Create a THdfsTable corresponding to this HdfsTable. If serializing the 
"FULL"
    * information, then then all partitions and THdfsFileDescs of each 
partition should be

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java 
b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
index 0cf89ab..ba3e5cf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
@@ -26,10 +26,13 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.JniUtil;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 
 /**
@@ -129,4 +132,11 @@ public class IncompleteTable extends Table {
       ImpalaException e) {
     return new IncompleteTable(db, name, e);
   }
+
+  @Override
+  public TGetPartialCatalogObjectResponse getPartialInfo(
+      TGetPartialCatalogObjectRequest req) throws TableLoadingException {
+    Throwables.propagateIfPossible(cause_, TableLoadingException.class);
+    throw new TableLoadingException(cause_.getMessage());
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 06c4500..d63551c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.analysis.TableName;
@@ -39,8 +40,12 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnDescriptor;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.TPartialTableInfo;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableInfoSelector;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.log4j.Logger;
@@ -374,6 +379,49 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   }
 
   /**
+   * Return partial info about this table. This is called only on the catalogd 
to
+   * service GetPartialCatalogObject RPCs.
+   */
+  public TGetPartialCatalogObjectResponse getPartialInfo(
+      TGetPartialCatalogObjectRequest req) throws TableLoadingException {
+    Preconditions.checkState(isLoaded(), "unloaded table: %s", getFullName());
+    TTableInfoSelector selector = 
Preconditions.checkNotNull(req.table_info_selector,
+        "no table_info_selector");
+
+    TGetPartialCatalogObjectResponse resp = new 
TGetPartialCatalogObjectResponse();
+    resp.setObject_version_number(getCatalogVersion());
+    resp.table_info = new TPartialTableInfo();
+    if (selector.want_hms_table) {
+      // TODO(todd): the deep copy could be a bit expensive. Unfortunately if 
we took
+      // a reference to this object, and let it escape out of the lock, it 
would be
+      // racy since the TTable is modified in place by some DDLs (eg 
'dropTableStats').
+      // We either need to ensure that TTable is cloned on every write, or we 
need to
+      // ensure that serialization of the GetPartialCatalogObjectResponse 
object
+      // is done while we continue to hold the table lock.
+      resp.table_info.setHms_table(getMetaStoreTable().deepCopy());
+    }
+    if (selector.want_stats_for_column_names != null) {
+      List<ColumnStatisticsObj> statsList = Lists.newArrayListWithCapacity(
+          selector.want_stats_for_column_names.size());
+      for (String colName: selector.want_stats_for_column_names) {
+        Column col = getColumn(colName);
+        if (col == null) continue;
+        // Ugly hack: if the catalogd has never gotten any stats from HMS, 
numDVs will
+        // be -1, and we'll have to send no stats to the impalad.
+        if (!col.getStats().hasNumDistinctValues()) continue;
+
+        ColumnStatisticsData tstats = 
col.getStats().toHmsCompatibleThrift(col.getType());
+        if (tstats == null) continue;
+        // TODO(todd): it seems like the column type is not used? maybe worth 
not
+        // setting it here to save serialization.
+        statsList.add(new ColumnStatisticsObj(colName, 
col.getType().toString(), tstats));
+      }
+      resp.table_info.setColumn_stats(statsList);
+    }
+
+    return resp;
+  }
+  /**
    * @see FeCatalogUtils#parseColumnType(FieldSchema, String)
    */
   protected Type parseColumnType(FieldSchema fs) throws TableLoadingException {

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
new file mode 100644
index 0000000..ef71e63
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.local;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TCatalogInfoSelector;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TDatabase;
+import org.apache.impala.thrift.TDbInfoSelector;
+import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialPartitionInfo;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableInfoSelector;
+import org.apache.impala.util.ListMap;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.errorprone.annotations.Immutable;
+
+/**
+ * MetaProvider which fetches metadata in a granular fashion from the catalogd.
+ */
+public class CatalogdMetaProvider implements MetaProvider {
+
+  // TODO(todd): currently we haven't implemented catalogd thrift APIs for all 
pieces
+  // of metadata. In order to incrementally build this out, we delegate 
various calls
+  // to the "direct" provider for now and circumvent catalogd.
+  private DirectMetaProvider directProvider_ = new DirectMetaProvider();
+
+  /**
+   * Send a GetPartialCatalogObject request to catalogd. This handles 
converting
+   * non-OK status responses back to exceptions, performing various generic 
sanity
+   * checks, etc.
+   */
+  private TGetPartialCatalogObjectResponse sendRequest(
+      TGetPartialCatalogObjectRequest req)
+      throws TException {
+    TGetPartialCatalogObjectResponse resp;
+    byte[] ret;
+    try {
+      ret = FeSupport.GetPartialCatalogObject(new 
TSerializer().serialize(req));
+    } catch (InternalException e) {
+      throw new TException(e);
+    }
+    resp = new TGetPartialCatalogObjectResponse();
+    new TDeserializer().deserialize(resp, ret);
+    if (resp.status.status_code != TErrorCode.OK) {
+      // TODO(todd) do reasonable error handling
+      throw new TException(resp.toString());
+    }
+
+    // If we requested information about a particular version of an object, but
+    // got back a response for a different version, then we have a case of 
"read skew".
+    // For example, we may have fetched the partition list of a table, 
performed pruning,
+    // and then tried to fetch the specific partitions needed for a query, 
while some
+    // concurrent DDL modified the set of partitions. This could result in an 
unexpected
+    // result which violates the snapshot consistency guarantees expected by 
users.
+    if (req.object_desc.isSetCatalog_version() &&
+        resp.isSetObject_version_number() &&
+        req.object_desc.catalog_version != resp.object_version_number) {
+      throw new InconsistentMetadataFetchException(String.format(
+          "Catalog object %s changed version from %s to %s while fetching 
metadata",
+          req.object_desc.toString(),
+          req.object_desc.catalog_version,
+          resp.object_version_number));
+    }
+    return resp;
+  }
+
+  @Override
+  public ImmutableList<String> loadDbList() throws TException {
+    TGetPartialCatalogObjectRequest req = newReqForCatalog();
+    req.catalog_info_selector.want_db_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.catalog_info != null && resp.catalog_info.db_names != 
null, req,
+        "missing table names");
+    return ImmutableList.copyOf(resp.catalog_info.db_names);
+  }
+
+  private TGetPartialCatalogObjectRequest newReqForCatalog() {
+    TGetPartialCatalogObjectRequest req = new 
TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.CATALOG);
+    req.catalog_info_selector = new TCatalogInfoSelector();
+    return req;
+  }
+
+  private TGetPartialCatalogObjectRequest newReqForDb(String dbName) {
+    TGetPartialCatalogObjectRequest req = new 
TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.DATABASE);
+    req.object_desc.db = new TDatabase(dbName);
+    req.db_info_selector = new TDbInfoSelector();
+    return req;
+  }
+
+  @Override
+  public Database loadDb(String dbName) throws TException {
+    TGetPartialCatalogObjectRequest req = newReqForDb(dbName);
+    req.db_info_selector.want_hms_database = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.db_info != null && resp.db_info.hms_database != null,
+        req, "missing expected HMS database");
+    return resp.db_info.hms_database;
+  }
+
+  @Override
+  public ImmutableList<String> loadTableNames(String dbName)
+      throws MetaException, UnknownDBException, TException {
+    // TODO(todd): do we ever need to fetch the DB without the table names
+    // or vice versa?
+    TGetPartialCatalogObjectRequest req = newReqForDb(dbName);
+    req.db_info_selector.want_table_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.db_info != null && resp.db_info.table_names != null,
+        req, "missing expected HMS table");
+    return ImmutableList.copyOf(resp.db_info.table_names);
+  }
+
+  private TGetPartialCatalogObjectRequest newReqForTable(String dbName,
+      String tableName) {
+    TGetPartialCatalogObjectRequest req = new 
TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable(dbName, tableName);
+    req.table_info_selector = new TTableInfoSelector();
+    return req;
+  }
+
+  private TGetPartialCatalogObjectRequest newReqForTable(TableMetaRef table) {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl,
+        "table ref %s was not created by CatalogdMetaProvider", table);
+    TGetPartialCatalogObjectRequest req = newReqForTable(
+        ((TableMetaRefImpl)table).dbName_,
+        ((TableMetaRefImpl)table).tableName_);
+    
req.object_desc.setCatalog_version(((TableMetaRefImpl)table).catalogVersion_);
+    return req;
+  }
+
+  @Override
+  public Pair<Table, TableMetaRef> loadTable(String dbName, String tableName)
+      throws NoSuchObjectException, MetaException, TException {
+    TGetPartialCatalogObjectRequest req = newReqForTable(dbName, tableName);
+    req.table_info_selector.want_hms_table = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.table_info != null && resp.table_info.hms_table != null,
+        req, "missing expected HMS table");
+    TableMetaRef ref = new TableMetaRefImpl(dbName, tableName, 
resp.table_info.hms_table,
+        resp.object_version_number);
+    return Pair.create(resp.table_info.hms_table, ref);
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef 
table,
+      List<String> colNames) throws TException {
+    TGetPartialCatalogObjectRequest req = newReqForTable(table);
+    req.table_info_selector.want_stats_for_column_names = colNames;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.table_info != null && resp.table_info.column_stats != 
null,
+        req, "missing column stats");
+    return resp.table_info.column_stats;
+  }
+
+  @Override
+  public List<PartitionRef> loadPartitionList(TableMetaRef table)
+      throws MetaException, TException {
+    TGetPartialCatalogObjectRequest req = newReqForTable(table);
+    req.table_info_selector.want_partition_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.table_info != null && resp.table_info.partitions != 
null,
+        req, "missing partition list result");
+    List<PartitionRef> ret = Lists.newArrayListWithCapacity(
+        resp.table_info.partitions.size());
+    for (TPartialPartitionInfo p : resp.table_info.partitions) {
+      checkResponse(p.isSetId(), req, "response missing partition IDs for 
partition %s",
+          p);
+      ret.add(new PartitionRefImpl(p));
+    }
+    return ret;
+  }
+
+  @Override
+  public Map<String, PartitionMetadata> loadPartitionsByRefs(TableMetaRef 
table,
+      List<String> partitionColumnNames,
+      ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partitionRefs)
+      throws MetaException, TException {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
+    TableMetaRefImpl refImpl = (TableMetaRefImpl)table;
+
+    List<Long> ids = Lists.newArrayListWithExpectedSize(partitionRefs.size());
+    for (PartitionRef ref: partitionRefs) {
+      ids.add(((PartitionRefImpl)ref).getId());
+    }
+
+    TGetPartialCatalogObjectRequest req = newReqForTable(table);
+    req.table_info_selector.partition_ids = ids;
+    req.table_info_selector.want_partition_metadata = true;
+    req.table_info_selector.want_partition_files = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.table_info != null && resp.table_info.partitions != 
null,
+        req, "missing partition list result");
+    checkResponse(resp.table_info.network_addresses != null,
+        req, "missing network addresses");
+    checkResponse(resp.table_info.partitions.size() == ids.size(),
+        req, "returned %d partitions instead of expected %d",
+        resp.table_info.partitions.size(), ids.size());
+
+    Map<String, PartitionMetadata> ret = 
Maps.newHashMapWithExpectedSize(ids.size());
+    for (TPartialPartitionInfo part: resp.table_info.partitions) {
+      Partition msPart = part.getHms_partition();
+      if (msPart == null) {
+        checkResponse(refImpl.msTable_.getPartitionKeysSize() == 0, req,
+            "Should not return a partition with missing HMS partition unless " 
+
+            "the table is unpartitioned");
+        msPart = DirectMetaProvider.msTableToPartition(refImpl.msTable_);
+      }
+      checkResponse(msPart != null && msPart.getValues() != null, req,
+          "malformed partition result: %s", part.toString());
+      String partName = FileUtils.makePartName(partitionColumnNames, 
msPart.getValues());
+
+      checkResponse(part.file_descriptors != null, req, "missing file 
descriptors");
+      List<FileDescriptor> fds = Lists.newArrayListWithCapacity(
+          part.file_descriptors.size());
+      for (THdfsFileDesc thriftFd: part.file_descriptors) {
+        FileDescriptor fd = FileDescriptor.fromThrift(thriftFd);
+        // The file descriptors returned via the RPC use host indexes that 
reference
+        // the 'network_addresses' list in the RPC. However, the caller may 
have already
+        // loaded some addresses into 'hostIndex'. So, the returned FDs need 
to be
+        // remapped to point to the caller's 'hostIndex' instead of the list 
in the
+        // RPC response.
+        fds.add(fd.cloneWithNewHostIndex(resp.table_info.network_addresses, 
hostIndex));
+      }
+      PartitionMetadataImpl metaImpl = new PartitionMetadataImpl(msPart,
+          ImmutableList.copyOf(fds));
+      PartitionMetadata oldVal = ret.put(partName, metaImpl);
+      if (oldVal != null) {
+        throw new RuntimeException("catalogd returned partition " + partName +
+            " multiple times");
+      }
+    }
+    return ret;
+  }
+
+  private static void checkResponse(boolean condition,
+      TGetPartialCatalogObjectRequest req, String msg, Object... args) throws 
TException {
+    if (condition) return;
+    throw new TException(String.format("Invalid response from catalogd for 
request " +
+        req.toString() + ": " + msg, args));
+  }
+
+  @Override
+  public String loadNullPartitionKeyValue() throws MetaException, TException {
+    return directProvider_.loadNullPartitionKeyValue();
+  }
+
+  @Override
+  public List<String> loadFunctionNames(String dbName) throws MetaException, 
TException {
+    return directProvider_.loadFunctionNames(dbName);
+  }
+
+  @Override
+  public Function getFunction(String dbName, String functionName)
+      throws MetaException, TException {
+    return directProvider_.getFunction(dbName, functionName);
+  }
+
+  /**
+   * Reference to a partition within a table. We remember the partition's ID 
and pass
+   * that back to the catalog in subsequent requests back to fetch the details 
of the
+   * partition, since the ID is smaller than the name and provides a unique 
(not-reused)
+   * identifier.
+   */
+  @Immutable
+  private static class PartitionRefImpl implements PartitionRef {
+    @SuppressWarnings("Immutable") // Thrift objects are mutable, but we won't 
mutate it.
+    private final TPartialPartitionInfo info_;
+
+    public PartitionRefImpl(TPartialPartitionInfo p) {
+      this.info_ = p;
+    }
+
+    @Override
+    public String getName() {
+      return info_.getName();
+    }
+
+    private long getId() {
+      return info_.id;
+    }
+  }
+
+  public static class PartitionMetadataImpl implements PartitionMetadata {
+    private final Partition msPartition_;
+    private final ImmutableList<FileDescriptor> fds_;
+
+    public PartitionMetadataImpl(Partition msPartition,
+        ImmutableList<FileDescriptor> fds) {
+      this.msPartition_ = msPartition;
+      this.fds_ = fds;
+    }
+
+    @Override
+    public Partition getHmsPartition() {
+      return msPartition_;
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getFileDescriptors() {
+      return fds_;
+    }
+  }
+
+  /**
+   * A reference to a table that has been looked up, allowing callers to fetch 
further
+   * detailed information. This is is more extensive than just the table name 
so that
+   * we can provide a consistency check that the catalog version doesn't 
change in
+   * between calls.
+   */
+  private static class TableMetaRefImpl implements TableMetaRef {
+    private final String dbName_;
+    private final String tableName_;
+
+    /**
+     * Stash the HMS Table object since we need this in order to handle some 
strange
+     * behavior whereby the catalogd returns a Partition with no HMS partition 
object
+     * in the case of unpartitioned tables.
+     */
+    private final Table msTable_;
+
+    /**
+     * The version of the table when we first loaded it. Subsequent requests 
about
+     * the table are verified against this version.
+     */
+    private final long catalogVersion_;
+
+    public TableMetaRefImpl(String dbName, String tableName,
+        Table msTable, long catalogVersion) {
+      this.dbName_ = dbName;
+      this.tableName_ = tableName;
+      this.msTable_ = msTable;
+      this.catalogVersion_ = catalogVersion;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 6ac7187..c1e3675 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -17,7 +17,10 @@
 
 package org.apache.impala.catalog.local;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,17 +39,25 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TBackendGflags;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.errorprone.annotations.Immutable;
 
 /**
  * Metadata provider which calls out directly to the source systems
@@ -93,11 +104,14 @@ class DirectMetaProvider implements MetaProvider {
   }
 
   @Override
-  public Table loadTable(String dbName, String tableName)
+  public Pair<Table, TableMetaRef> loadTable(String dbName, String tableName)
       throws MetaException, NoSuchObjectException, TException {
+    Table msTable;
     try (MetaStoreClient c = msClientPool_.getClient()) {
-      return c.getHiveClient().getTable(dbName, tableName);
+      msTable = c.getHiveClient().getTable(dbName, tableName);
     }
+    TableMetaRef ref = new TableMetaRefImpl(dbName, tableName, msTable);
+    return Pair.create(msTable, ref);
   }
 
   @Override
@@ -108,39 +122,71 @@ class DirectMetaProvider implements MetaProvider {
   }
 
   @Override
-  public List<String> loadPartitionNames(String dbName, String tableName)
+  public List<PartitionRef> loadPartitionList(TableMetaRef table)
       throws MetaException, TException {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
+    TableMetaRefImpl ref = (TableMetaRefImpl)table;
+
+    // If the table isn't partitioned, just return a single partition with no 
name.
+    // In loadPartitionsByRefs() below, we'll detect this case and load the 
special
+    // unpartitioned table.
+    if (!ref.isPartitioned()) {
+      return ImmutableList.of((PartitionRef)new PartitionRefImpl(
+          PartitionRefImpl.UNPARTITIONED_NAME));
+    }
+
+    List<String> partNames;
     try (MetaStoreClient c = msClientPool_.getClient()) {
-      return c.getHiveClient().listPartitionNames(dbName, tableName,
-          /*max_parts=*/(short)-1);
+      partNames = c.getHiveClient().listPartitionNames(
+          ref.dbName_, ref.tableName_, /*max_parts=*/(short)-1);
+    }
+    List<PartitionRef> partRefs = 
Lists.newArrayListWithCapacity(partNames.size());
+    for (String name: partNames) {
+      partRefs.add(new PartitionRefImpl(name));
     }
+    return partRefs;
   }
 
   @Override
-  public Map<String, Partition> loadPartitionsByNames(
-      String dbName, String tableName, List<String> partitionColumnNames,
-      List<String> partitionNames) throws MetaException, TException {
-    Preconditions.checkNotNull(dbName);
-    Preconditions.checkNotNull(tableName);
+  public Map<String, PartitionMetadata> loadPartitionsByRefs(
+      TableMetaRef table, List<String> partitionColumnNames,
+      ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partitionRefs) throws MetaException, TException {
+    Preconditions.checkNotNull(table);
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
     Preconditions.checkArgument(!partitionColumnNames.isEmpty());
-    Preconditions.checkNotNull(partitionNames);
+    Preconditions.checkNotNull(partitionRefs);
+
+    TableMetaRefImpl tableImpl = (TableMetaRefImpl)table;
+
+    String fullTableName = tableImpl.dbName_ + "." + tableImpl.tableName_;
 
-    Map<String, Partition> ret = Maps.newHashMapWithExpectedSize(
-        partitionNames.size());
-    if (partitionNames.isEmpty()) return ret;
+    if (!((TableMetaRefImpl)table).isPartitioned()) {
+      return loadUnpartitionedPartition((TableMetaRefImpl)table, partitionRefs,
+          hostIndex);
+    }
+
+    Map<String, PartitionMetadata> ret = Maps.newHashMapWithExpectedSize(
+        partitionRefs.size());
+    if (partitionRefs.isEmpty()) return ret;
 
     // Fetch the partitions.
+    List<String> partNames = 
Lists.newArrayListWithCapacity(partitionRefs.size());
+    for (PartitionRef ref: partitionRefs) {
+      partNames.add(ref.getName());
+    }
+
     List<Partition> parts;
     try (MetaStoreClient c = msClientPool_.getClient()) {
       parts = MetaStoreUtil.fetchPartitionsByName(
-          c.getHiveClient(), partitionNames, dbName, tableName);
+          c.getHiveClient(), partNames, tableImpl.dbName_, 
tableImpl.tableName_);
     }
 
     // HMS may return fewer partition objects than requested, and the
     // returned partition objects don't carry enough information to get their
     // names. So, we map the returned partitions back to the requested names
     // using the passed-in partition column names.
-    Set<String> namesSet = ImmutableSet.copyOf(partitionNames);
+    Set<String> namesSet = ImmutableSet.copyOf(partNames);
     for (Partition p: parts) {
       List<String> vals = p.getValues();
       if (vals.size() != partitionColumnNames.size()) {
@@ -152,16 +198,50 @@ class DirectMetaProvider implements MetaProvider {
         throw new MetaException("HMS returned unexpected partition " + 
partName +
             " which was not requested. Requested: " + namesSet);
       }
-      Partition existing = ret.put(partName, p);
+
+      ImmutableList<FileDescriptor> fds = loadFileMetadata(
+          fullTableName, partName, p, hostIndex);
+
+      PartitionMetadata existing = ret.put(partName, new 
PartitionMetadataImpl(p, fds));
       if (existing != null) {
         throw new MetaException("HMS returned multiple partitions with name " +
             partName);
       }
     }
 
+
     return ret;
   }
 
+  /**
+   * We model partitions slightly differently to Hive. So, in the case of an
+   * unpartitioned table, we have to create a fake Partition object which has 
the
+   * metadata of the table.
+   */
+  private Map<String, PartitionMetadata> loadUnpartitionedPartition(
+      TableMetaRefImpl table, List<PartitionRef> partitionRefs,
+      ListMap<TNetworkAddress> hostIndex) {
+    Preconditions.checkArgument(partitionRefs.size() == 1,
+        "Expected exactly one partition to load for unpartitioned table");
+    PartitionRef ref = partitionRefs.get(0);
+    Preconditions.checkArgument(ref.getName().isEmpty(),
+        "Expected empty partition name for unpartitioned table");
+    Partition msPartition = msTableToPartition(table.msTable_);
+    String fullName = table.dbName_ + "." + table.tableName_;
+    ImmutableList<FileDescriptor> fds = loadFileMetadata(fullName,
+        "default",  msPartition, hostIndex);
+    return ImmutableMap.of("", (PartitionMetadata)new PartitionMetadataImpl(
+        msPartition, fds));
+  }
+
+  static Partition msTableToPartition(Table msTable) {
+    Partition msp = new Partition();
+    msp.setSd(msTable.getSd());
+    msp.setParameters(msTable.getParameters());
+    msp.setValues(Collections.<String>emptyList());
+    return msp;
+  }
+
   @Override
   public List<String> loadFunctionNames(String dbName) throws TException {
     Preconditions.checkNotNull(dbName);
@@ -183,22 +263,125 @@ class DirectMetaProvider implements MetaProvider {
   }
 
   @Override
-  public List<ColumnStatisticsObj> loadTableColumnStatistics(String dbName,
-      String tblName, List<String> colNames) throws TException {
+  public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef 
table,
+      List<String> colNames) throws TException {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
     try (MetaStoreClient c = msClientPool_.getClient()) {
-      return c.getHiveClient().getTableColumnStatistics(dbName, tblName, 
colNames);
+      return c.getHiveClient().getTableColumnStatistics(
+          ((TableMetaRefImpl)table).dbName_,
+          ((TableMetaRefImpl)table).tableName_,
+          colNames);
     }
   }
 
-  @Override
-  public List<LocatedFileStatus> loadFileMetadata(Path dir) throws IOException 
{
-    Preconditions.checkNotNull(dir);
-    Preconditions.checkArgument(dir.isAbsolute(),
-        "Must pass absolute path: %s", dir);
-    FileSystem fs = dir.getFileSystem(CONF);
-    RemoteIterator<LocatedFileStatus> it = fs.listFiles(dir, 
/*recursive=*/false);
-    ImmutableList.Builder<LocatedFileStatus> b = new ImmutableList.Builder<>();
-    while (it.hasNext()) b.add(it.next());
-    return b.build();
+  private ImmutableList<FileDescriptor> loadFileMetadata(String fullTableName,
+      String partName, Partition msPartition, ListMap<TNetworkAddress> 
hostIndex) {
+    Path partDir = new Path(msPartition.getSd().getLocation());
+
+    List<LocatedFileStatus> stats = Lists.newArrayList();
+    try {
+      FileSystem fs = partDir.getFileSystem(CONF);
+      RemoteIterator<LocatedFileStatus> it = fs.listFiles(partDir, 
/*recursive=*/false);
+      while (it.hasNext()) stats.add(it.next());
+    } catch (FileNotFoundException fnf) {
+      // If the partition directory isn't found, this is treated as having no
+      // files.
+      return ImmutableList.of();
+    } catch (IOException ioe) {
+      throw new LocalCatalogException(String.format(
+          "Could not load files for partition %s of table %s",
+          partName, fullTableName), ioe);
+    }
+
+    HdfsTable.FileMetadataLoadStats loadStats =
+        new HdfsTable.FileMetadataLoadStats(partDir);
+
+    try {
+      FileSystem fs = partDir.getFileSystem(CONF);
+      return ImmutableList.copyOf(
+          HdfsTable.createFileDescriptors(fs, new FakeRemoteIterator<>(stats),
+              hostIndex, loadStats));
+    } catch (IOException e) {
+        throw new LocalCatalogException(String.format(
+            "Could not convert files to descriptors for partition %s of table 
%s",
+            partName, fullTableName), e);
+    }
+  }
+
+  @Immutable
+  private static class PartitionRefImpl implements PartitionRef {
+    private static final String UNPARTITIONED_NAME = "";
+    private final String name_;
+
+    public PartitionRefImpl(String name) {
+      this.name_ = name;
+    }
+
+    @Override
+    public String getName() {
+      return name_;
+    }
+  }
+
+  private static class PartitionMetadataImpl implements PartitionMetadata {
+    private final Partition msPartition_;
+    private final ImmutableList<FileDescriptor> fds_;
+
+    public PartitionMetadataImpl(Partition msPartition,
+        ImmutableList<FileDescriptor> fds) {
+      this.msPartition_ = msPartition;
+      this.fds_ = fds;
+    }
+
+    @Override
+    public Partition getHmsPartition() {
+      return msPartition_;
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getFileDescriptors() {
+      return fds_;
+    }
+  }
+
+  private class TableMetaRefImpl implements TableMetaRef {
+
+    private final String dbName_;
+    private final String tableName_;
+    private final Table msTable_;
+
+    public TableMetaRefImpl(String dbName, String tableName, Table msTable) {
+      this.dbName_ = dbName;
+      this.tableName_ = tableName;
+      this.msTable_ = msTable;
+    }
+
+    private boolean isPartitioned() {
+      return msTable_.getPartitionKeysSize() != 0;
+    }
+  }
+
+
+  /**
+   * Wrapper for a normal Iterable<T> to appear like a Hadoop 
RemoteIterator<T>.
+   * This is necessary because the existing code to convert file statuses to
+   * descriptors consumes the remote iterator directly and thus avoids 
materializing
+   * all of the LocatedFileStatus objects in memory at the same time.
+   */
+  private static class FakeRemoteIterator<T> implements RemoteIterator<T> {
+    private final Iterator<T> it_;
+
+    FakeRemoteIterator(Iterable<T> it) {
+      this.it_ = it.iterator();
+    }
+    @Override
+    public boolean hasNext() throws IOException {
+      return it_.hasNext();
+    }
+
+    @Override
+    public T next() throws IOException {
+      return it_.next();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
 
b/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
new file mode 100644
index 0000000..9147cec
--- /dev/null
+++ 
b/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.local;
+
+/**
+ * If this is thrown, it indicates that the catalog implementation in the 
Impalad
+ * has identified that the metadata it read was not a proper snapshot of the 
source
+ * metadata. In other words, the resulting plan may be incorrect and so the 
plan
+ * in progress should be discarded and retried. It should be assumed that, if 
this
+ * exception is thrown, the catalog has already taken appropriate steps to 
ensure that
+ * a retry will not encounter the same inconsistency.
+ *
+ * Note that the above does not guarantee that a retry will succeed, only that 
it will
+ * not encounter the _same_ conflict.
+ */
+public class InconsistentMetadataFetchException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public InconsistentMetadataFetchException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 37b25b4..8d57210 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -72,7 +72,7 @@ public class LocalCatalog implements FeCatalog {
   private final String defaultKuduMasterHosts_;
 
   public static LocalCatalog create(String defaultKuduMasterHosts) {
-    return new LocalCatalog(new DirectMetaProvider(), defaultKuduMasterHosts);
+    return new LocalCatalog(new CatalogdMetaProvider(), 
defaultKuduMasterHosts);
   }
 
   private LocalCatalog(MetaProvider metaProvider, String 
defaultKuduMasterHosts) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index 2e35ce8..9cf2d8b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -17,17 +17,13 @@
 
 package org.apache.impala.catalog.local;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.FeCatalogUtils;
@@ -38,7 +34,6 @@ import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
 import 
org.apache.impala.catalog.HdfsStorageDescriptor.InvalidStorageDescriptorException;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.PartitionStatsUtil;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.THdfsPartitionLocation;
@@ -49,17 +44,21 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 
 public class LocalFsPartition implements FeFsPartition {
-  private static final Configuration CONF = new Configuration();
   private final LocalFsTable table_;
   private final LocalPartitionSpec spec_;
   private final Partition msPartition_;
-  private ImmutableList<FileDescriptor> fileDescriptors_;
+  /**
+   * Null in the case of a 'prototype partition'.
+   */
+  @Nullable
+  private final ImmutableList<FileDescriptor> fileDescriptors_;
 
   public LocalFsPartition(LocalFsTable table, LocalPartitionSpec spec,
-      Partition msPartition) {
+      Partition msPartition, ImmutableList<FileDescriptor> fileDescriptors) {
     table_ = Preconditions.checkNotNull(table);
     spec_ = Preconditions.checkNotNull(spec);
     msPartition_ = Preconditions.checkNotNull(msPartition);
+    fileDescriptors_ = fileDescriptors;
   }
 
   @Override
@@ -79,19 +78,16 @@ public class LocalFsPartition implements FeFsPartition {
 
   @Override
   public List<FileDescriptor> getFileDescriptors() {
-    loadFileDescriptors();
     return fileDescriptors_;
   }
 
   @Override
   public boolean hasFileDescriptors() {
-    loadFileDescriptors();
     return !fileDescriptors_.isEmpty();
   }
 
   @Override
   public int getNumFileDescriptors() {
-    loadFileDescriptors();
     return fileDescriptors_.size();
   }
 
@@ -172,7 +168,6 @@ public class LocalFsPartition implements FeFsPartition {
 
   @Override
   public long getSize() {
-    loadFileDescriptors();
     long size = 0;
     for (FileDescriptor fd : fileDescriptors_) {
       size += fd.getFileLength();
@@ -218,60 +213,4 @@ public class LocalFsPartition implements FeFsPartition {
     return Maps.filterKeys(getParameters(),
         HdfsPartition.IS_NOT_INCREMENTAL_STATS_KEY);
   }
-
-
-  private void loadFileDescriptors() {
-    if (fileDescriptors_ != null) return;
-    Path partDir = getLocationPath();
-    List<LocatedFileStatus> stats;
-    try {
-      stats = 
table_.db_.getCatalog().getMetaProvider().loadFileMetadata(partDir);
-    } catch (FileNotFoundException fnf) {
-      // If the partition directory isn't found, this is treated as having no
-      // files.
-      fileDescriptors_ = ImmutableList.of();
-      return;
-    } catch (IOException ioe) {
-      throw new LocalCatalogException(String.format(
-          "Could not load files for partition %s of table %s",
-          spec_.getName(), table_.getFullName()), ioe);
-    }
-
-    HdfsTable.FileMetadataLoadStats loadStats =
-        new HdfsTable.FileMetadataLoadStats(partDir);
-
-    try {
-      FileSystem fs = partDir.getFileSystem(CONF);
-      fileDescriptors_ = ImmutableList.copyOf(
-          HdfsTable.createFileDescriptors(fs, new FakeRemoteIterator<>(stats),
-              table_.getHostIndex(), loadStats));
-    } catch (IOException e) {
-        throw new LocalCatalogException(String.format(
-            "Could not convert files to descriptors for partition %s of table 
%s",
-            spec_.getName(), table_.getFullName()), e);
-    }
-  }
-
-  /**
-   * Wrapper for a normal Iterable<T> to appear like a Hadoop 
RemoteIterator<T>.
-   * This is necessary because the existing code to convert file statuses to
-   * descriptors consumes the remote iterator directly and thus avoids 
materializing
-   * all of the LocatedFileStatus objects in memory at the same time.
-   */
-  private static class FakeRemoteIterator<T> implements RemoteIterator<T> {
-    private final Iterator<T> it_;
-
-    FakeRemoteIterator(Iterable<T> it) {
-      this.it_ = it.iterator();
-    }
-    @Override
-    public boolean hasNext() throws IOException {
-      return it_.hasNext();
-    }
-
-    @Override
-    public T next() throws IOException {
-      return it_.next();
-    }
-  }
 }

Reply via email to