IMPALA-3499: Split catalog update

JNI does not support writing java byte array larger than 2GB.
Instead of passing a single serialized update to frontend,
this patch splits the update into a vector of updates less
than 500MB each. Then they are serialized, sent to frontend,
deserialized and merged before calling
Frontend::updateCatalogCache().

Change-Id: I176db25124a32944f2396ce8aafbed49cac95928
Reviewed-on: http://gerrit.cloudera.org:8080/3067
Reviewed-by: Huaisi Xu <[email protected]>
Tested-by: Huaisi Xu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7f95fe23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7f95fe23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7f95fe23

Branch: refs/heads/master
Commit: 7f95fe23f11a280d1d42ca5e99d3315b6e5fc2a9
Parents: a3a7c7c
Author: Huaisi Xu <[email protected]>
Authored: Fri May 13 14:34:40 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Mon Jun 20 15:37:17 2016 -0700

----------------------------------------------------------------------
 be/src/service/frontend.cc                      |  4 +-
 be/src/service/frontend.h                       | 10 ++---
 be/src/service/impala-server.cc                 | 41 +++++++++++++++-----
 be/src/util/jni-util.h                          | 24 ++++++++++++
 .../cloudera/impala/service/JniFrontend.java    | 26 +++++++++++--
 5 files changed, 85 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f95fe23/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index 2b7c5b0..e6cfa0a 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -64,7 +64,7 @@ Frontend::Frontend() {
     {"getHadoopConfig", "([B)[B", &get_hadoop_config_id_},
     {"getAllHadoopConfigs", "()[B", &get_hadoop_configs_id_},
     {"checkConfiguration", "()Ljava/lang/String;", &check_config_id_},
-    {"updateCatalogCache", "([B)[B", &update_catalog_cache_id_},
+    {"updateCatalogCache", "([[B)[B", &update_catalog_cache_id_},
     {"updateMembership", "([B)V", &update_membership_id_},
     {"getTableNames", "([B)[B", &get_table_names_id_},
     {"describeDb", "([B)[B", &describe_db_id_},
@@ -113,7 +113,7 @@ Frontend::Frontend() {
   ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, fe, &fe_));
 }
 
-Status Frontend::UpdateCatalogCache(const TUpdateCatalogCacheRequest& req,
+Status Frontend::UpdateCatalogCache(const vector<TUpdateCatalogCacheRequest>& 
req,
     TUpdateCatalogCacheResponse* resp) {
   return JniUtil::CallJniMethod(fe_, update_catalog_cache_id_, req, resp);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f95fe23/be/src/service/frontend.h
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index e581bef..ac0b5d5 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -35,10 +35,10 @@ class Frontend {
   /// or if there is any further exception, the constructor will terminate the 
process.
   Frontend();
 
-  /// Request to update the Impalad catalog cache. The 
TUpdateCatalogCacheRequest contains
-  /// a list of objects that should be added/removed from the Catalog. Returns 
a response
-  /// that contains details such as the new max catalog version.
-  Status UpdateCatalogCache(const TUpdateCatalogCacheRequest& req,
+  /// Request to update the Impalad catalog cache. The req argument contains a 
vector of
+  /// updates that each contain objects that should be added/removed from the 
Catalog.
+  /// Returns a response that contains details such as the new max catalog 
version.
+  Status UpdateCatalogCache(const vector<TUpdateCatalogCacheRequest>& req,
       TUpdateCatalogCacheResponse *resp);
 
   /// Request to update the Impalad frontend cluster membership snapshot.  The
@@ -175,7 +175,7 @@ class Frontend {
   jmethodID get_hadoop_config_id_;  // JniFrontend.getHadoopConfig(byte[])
   jmethodID get_hadoop_configs_id_;  // JniFrontend.getAllHadoopConfigs()
   jmethodID check_config_id_; // JniFrontend.checkConfiguration()
-  jmethodID update_catalog_cache_id_; // JniFrontend.updateCatalogCache()
+  jmethodID update_catalog_cache_id_; // 
JniFrontend.updateCatalogCache(byte[][])
   jmethodID update_membership_id_; // JniFrontend.updateMembership()
   jmethodID get_table_names_id_; // JniFrontend.getTableNames
   jmethodID describe_db_id_; // JniFrontend.describeDb

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f95fe23/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 771d5b0..ec58ccc 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -194,6 +194,9 @@ const string AUDIT_EVENT_LOG_FILE_PREFIX = 
"impala_audit_event_log_1.0-";
 const string LINEAGE_LOG_FILE_PREFIX = "impala_lineage_log_1.0-";
 
 const uint32_t MAX_CANCELLATION_QUEUE_SIZE = 65536;
+// Max size for multiple update in a single split. JNI is not able to write 
java byte
+// array more than 2GB. A single topic update is not restricted by this.
+const uint64_t MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES = 500 * 1024 * 1024;
 
 const string BEESWAX_SERVER_NAME = "beeswax-frontend";
 const string HS2_SERVER_NAME = "hiveserver2-frontend";
@@ -1240,14 +1243,17 @@ void ImpalaServer::CatalogUpdateCallback(
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;
 
-
-  // Process any updates
+  // Update catalog cache in frontend. An update is split into batches of size
+  // MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES each for multiple updates. IMPALA-3499
   if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0)  {
-    TUpdateCatalogCacheRequest update_req;
-    update_req.__set_is_delta(delta.is_delta);
+    vector<TUpdateCatalogCacheRequest> update_reqs;
+    update_reqs.push_back(TUpdateCatalogCacheRequest());
+    TUpdateCatalogCacheRequest* incremental_request = &update_reqs.back();
+    incremental_request->__set_is_delta(delta.is_delta);
     // Process all Catalog updates (new and modified objects) and determine 
what the
     // new catalog version will be.
     int64_t new_catalog_version = catalog_update_info_.catalog_version;
+    uint64_t batch_size_bytes = 0;
     for (const TTopicItem& item: delta.topic_entries) {
       uint32_t len = item.value.size();
       TCatalogObject catalog_object;
@@ -1257,12 +1263,20 @@ void ImpalaServer::CatalogUpdateCallback(
         LOG(ERROR) << "Error deserializing item: " << status.GetDetail();
         continue;
       }
+      if (len > 100 * 1024 * 1024 /* 100MB */) {
+        LOG(INFO) << "Received large catalog update(>100mb): "
+                     << item.key << " is "
+                     << PrettyPrinter::Print(len, TUnit::BYTES);
+      }
       if (catalog_object.type == TCatalogObjectType::CATALOG) {
-        
update_req.__set_catalog_service_id(catalog_object.catalog.catalog_service_id);
+        incremental_request->__set_catalog_service_id(
+            catalog_object.catalog.catalog_service_id);
         new_catalog_version = catalog_object.catalog_version;
       }
 
       // Refresh the lib cache entries of any added functions and data sources
+      // TODO: if frontend returns the list of functions and data sources, we 
do not
+      // need to deserialize these in backend.
       if (catalog_object.type == TCatalogObjectType::FUNCTION) {
         DCHECK(catalog_object.__isset.fn);
         LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location);
@@ -1272,8 +1286,16 @@ void ImpalaServer::CatalogUpdateCallback(
         
LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location);
       }
 
-      update_req.updated_objects.push_back(catalog_object);
+      if (batch_size_bytes + len > MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES) {
+        update_reqs.push_back(TUpdateCatalogCacheRequest());
+        incremental_request = &update_reqs.back();
+        batch_size_bytes = 0;
+      }
+      incremental_request->updated_objects.push_back(catalog_object);
+      batch_size_bytes += len;
     }
+    update_reqs.push_back(TUpdateCatalogCacheRequest());
+    TUpdateCatalogCacheRequest* deletion_request = &update_reqs.back();
 
     // We need to look up the dropped functions and data sources and remove 
them
     // from the library cache. The data sent from the catalog service does not
@@ -1292,7 +1314,7 @@ void ImpalaServer::CatalogUpdateCallback(
                    << "Error: " << status.GetDetail();
         continue;
       }
-      update_req.removed_objects.push_back(catalog_object);
+      deletion_request->removed_objects.push_back(catalog_object);
       if (catalog_object.type == TCatalogObjectType::FUNCTION ||
           catalog_object.type == TCatalogObjectType::DATA_SOURCE) {
         TCatalogObject dropped_object;
@@ -1314,7 +1336,7 @@ void ImpalaServer::CatalogUpdateCallback(
 
     // Call the FE to apply the changes to the Impalad Catalog.
     TUpdateCatalogCacheResponse resp;
-    Status s = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp);
+    Status s = exec_env_->frontend()->UpdateCatalogCache(update_reqs, &resp);
     if (!s.ok()) {
       LOG(ERROR) << "There was an error processing the impalad catalog update. 
Requesting"
                  << " a full topic update to recover: " << s.GetDetail();
@@ -1409,7 +1431,8 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
 
      // Apply the changes to the local catalog cache.
     TUpdateCatalogCacheResponse resp;
-    Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, 
&resp);
+    Status status = exec_env_->frontend()->UpdateCatalogCache(
+        vector<TUpdateCatalogCacheRequest>{update_req}, &resp);
     if (!status.ok()) LOG(ERROR) << status.GetDetail();
     RETURN_IF_ERROR(status);
     if (!wait_for_all_subscribers) return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f95fe23/be/src/util/jni-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index ac14f84..dc55f78 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -265,6 +265,30 @@ class JniUtil {
     return Status::OK();
   }
 
+  template <typename T, typename R>
+  static Status CallJniMethod(const jobject& obj, const jmethodID& method,
+      const vector<T>& args, R* response) {
+    JNIEnv* jni_env = getJNIEnv();
+    JniLocalFrame jni_frame;
+    RETURN_IF_ERROR(jni_frame.push(jni_env));
+    jclass jByteArray_class = jni_env->FindClass("[B");
+    jobjectArray array_of_jByteArray =
+        jni_env->NewObjectArray(args.size(), jByteArray_class, NULL);
+    RETURN_ERROR_IF_EXC(jni_env);
+    jbyteArray request_bytes;
+    for (int i = 0; i < args.size(); i++) {
+      RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &args[i], &request_bytes));
+      jni_env->SetObjectArrayElement(array_of_jByteArray, i, request_bytes);
+      RETURN_ERROR_IF_EXC(jni_env);
+      jni_env->DeleteLocalRef(request_bytes);
+    }
+    jbyteArray result_bytes = static_cast<jbyteArray>(
+        jni_env->CallObjectMethod(obj, method, array_of_jByteArray));
+    RETURN_ERROR_IF_EXC(jni_env);
+    RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, response));
+    return Status::OK();
+  }
+
   template <typename T>
   static Status CallJniMethod(const jobject& obj, const jmethodID& method,
       const T& arg, std::string* response) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f95fe23/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java 
b/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java
index b5fcbf3..2256900 100644
--- a/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java
+++ b/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java
@@ -16,6 +16,7 @@ package com.cloudera.impala.service;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
@@ -80,6 +81,7 @@ import com.cloudera.impala.thrift.TShowRolesParams;
 import com.cloudera.impala.thrift.TShowRolesResult;
 import com.cloudera.impala.thrift.TShowStatsParams;
 import com.cloudera.impala.thrift.TTableName;
+import com.cloudera.impala.thrift.TUniqueId;
 import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest;
 import com.cloudera.impala.thrift.TUpdateMembershipRequest;
 import com.cloudera.impala.util.GlogAppender;
@@ -153,12 +155,28 @@ public class JniFrontend {
     }
   }
 
-  public byte[] updateCatalogCache(byte[] thriftCatalogUpdate) throws 
ImpalaException {
-    TUpdateCatalogCacheRequest req = new TUpdateCatalogCacheRequest();
-    JniUtil.deserializeThrift(protocolFactory_, req, thriftCatalogUpdate);
+  // Deserialize and merge each thrift catalog update into a single merged 
update
+  public byte[] updateCatalogCache(byte[][] thriftCatalogUpdates) throws 
ImpalaException {
+    TUniqueId defaultCatalogServiceId = new TUniqueId(0L, 0L);
+    TUpdateCatalogCacheRequest mergedUpdateRequest = new 
TUpdateCatalogCacheRequest(
+        false, defaultCatalogServiceId, new ArrayList<TCatalogObject>(),
+        new ArrayList<TCatalogObject>());
+    for (byte[] catalogUpdate: thriftCatalogUpdates) {
+      TUpdateCatalogCacheRequest incrementalRequest = new 
TUpdateCatalogCacheRequest();
+      JniUtil.deserializeThrift(protocolFactory_, incrementalRequest, 
catalogUpdate);
+      mergedUpdateRequest.is_delta |= incrementalRequest.is_delta;
+      if 
(!incrementalRequest.getCatalog_service_id().equals(defaultCatalogServiceId)) {
+        mergedUpdateRequest.setCatalog_service_id(
+            incrementalRequest.getCatalog_service_id());
+      }
+      mergedUpdateRequest.getUpdated_objects().addAll(
+          incrementalRequest.getUpdated_objects());
+      mergedUpdateRequest.getRemoved_objects().addAll(
+          incrementalRequest.getRemoved_objects());
+    }
     TSerializer serializer = new TSerializer(protocolFactory_);
     try {
-      return serializer.serialize(frontend_.updateCatalogCache(req));
+      return 
serializer.serialize(frontend_.updateCatalogCache(mergedUpdateRequest));
     } catch (TException e) {
       throw new InternalException(e.getMessage());
     }

Reply via email to