IMPALA-3499: Split catalog update JNI does not support writing java byte array larger than 2GB. Instead of passing a single serialized update to frontend, this patch splits the update into a vector of updates less than 500MB each. Then they are serialized, sent to frontend, deserialized and merged before calling Frontend::updateCatalogCache().
Change-Id: I176db25124a32944f2396ce8aafbed49cac95928 Reviewed-on: http://gerrit.cloudera.org:8080/3067 Reviewed-by: Huaisi Xu <[email protected]> Tested-by: Huaisi Xu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7f95fe23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7f95fe23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7f95fe23 Branch: refs/heads/master Commit: 7f95fe23f11a280d1d42ca5e99d3315b6e5fc2a9 Parents: a3a7c7c Author: Huaisi Xu <[email protected]> Authored: Fri May 13 14:34:40 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Mon Jun 20 15:37:17 2016 -0700 ---------------------------------------------------------------------- be/src/service/frontend.cc | 4 +- be/src/service/frontend.h | 10 ++--- be/src/service/impala-server.cc | 41 +++++++++++++++----- be/src/util/jni-util.h | 24 ++++++++++++ .../cloudera/impala/service/JniFrontend.java | 26 +++++++++++-- 5 files changed, 85 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f95fe23/be/src/service/frontend.cc ---------------------------------------------------------------------- diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc index 2b7c5b0..e6cfa0a 100644 --- a/be/src/service/frontend.cc +++ b/be/src/service/frontend.cc @@ -64,7 +64,7 @@ Frontend::Frontend() { {"getHadoopConfig", "([B)[B", &get_hadoop_config_id_}, {"getAllHadoopConfigs", "()[B", &get_hadoop_configs_id_}, {"checkConfiguration", "()Ljava/lang/String;", &check_config_id_}, - {"updateCatalogCache", "([B)[B", &update_catalog_cache_id_}, + {"updateCatalogCache", "([[B)[B", &update_catalog_cache_id_}, {"updateMembership", "([B)V", &update_membership_id_}, {"getTableNames", "([B)[B", &get_table_names_id_}, {"describeDb", "([B)[B", &describe_db_id_}, @@ -113,7 +113,7 @@ Frontend::Frontend() { ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, fe, &fe_)); } -Status Frontend::UpdateCatalogCache(const TUpdateCatalogCacheRequest& req, +Status Frontend::UpdateCatalogCache(const vector<TUpdateCatalogCacheRequest>& req, TUpdateCatalogCacheResponse* resp) { return JniUtil::CallJniMethod(fe_, update_catalog_cache_id_, req, resp); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f95fe23/be/src/service/frontend.h ---------------------------------------------------------------------- diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h index e581bef..ac0b5d5 100644 --- a/be/src/service/frontend.h +++ b/be/src/service/frontend.h @@ -35,10 +35,10 @@ class Frontend { /// or if there is any further exception, the constructor will terminate the process. Frontend(); - /// Request to update the Impalad catalog cache. The TUpdateCatalogCacheRequest contains - /// a list of objects that should be added/removed from the Catalog. Returns a response - /// that contains details such as the new max catalog version. - Status UpdateCatalogCache(const TUpdateCatalogCacheRequest& req, + /// Request to update the Impalad catalog cache. The req argument contains a vector of + /// updates that each contain objects that should be added/removed from the Catalog. + /// Returns a response that contains details such as the new max catalog version. + Status UpdateCatalogCache(const vector<TUpdateCatalogCacheRequest>& req, TUpdateCatalogCacheResponse *resp); /// Request to update the Impalad frontend cluster membership snapshot. The @@ -175,7 +175,7 @@ class Frontend { jmethodID get_hadoop_config_id_; // JniFrontend.getHadoopConfig(byte[]) jmethodID get_hadoop_configs_id_; // JniFrontend.getAllHadoopConfigs() jmethodID check_config_id_; // JniFrontend.checkConfiguration() - jmethodID update_catalog_cache_id_; // JniFrontend.updateCatalogCache() + jmethodID update_catalog_cache_id_; // JniFrontend.updateCatalogCache(byte[][]) jmethodID update_membership_id_; // JniFrontend.updateMembership() jmethodID get_table_names_id_; // JniFrontend.getTableNames jmethodID describe_db_id_; // JniFrontend.describeDb http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f95fe23/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 771d5b0..ec58ccc 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -194,6 +194,9 @@ const string AUDIT_EVENT_LOG_FILE_PREFIX = "impala_audit_event_log_1.0-"; const string LINEAGE_LOG_FILE_PREFIX = "impala_lineage_log_1.0-"; const uint32_t MAX_CANCELLATION_QUEUE_SIZE = 65536; +// Max size for multiple update in a single split. JNI is not able to write java byte +// array more than 2GB. A single topic update is not restricted by this. +const uint64_t MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES = 500 * 1024 * 1024; const string BEESWAX_SERVER_NAME = "beeswax-frontend"; const string HS2_SERVER_NAME = "hiveserver2-frontend"; @@ -1240,14 +1243,17 @@ void ImpalaServer::CatalogUpdateCallback( if (topic == incoming_topic_deltas.end()) return; const TTopicDelta& delta = topic->second; - - // Process any updates + // Update catalog cache in frontend. An update is split into batches of size + // MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES each for multiple updates. IMPALA-3499 if (delta.topic_entries.size() != 0 || delta.topic_deletions.size() != 0) { - TUpdateCatalogCacheRequest update_req; - update_req.__set_is_delta(delta.is_delta); + vector<TUpdateCatalogCacheRequest> update_reqs; + update_reqs.push_back(TUpdateCatalogCacheRequest()); + TUpdateCatalogCacheRequest* incremental_request = &update_reqs.back(); + incremental_request->__set_is_delta(delta.is_delta); // Process all Catalog updates (new and modified objects) and determine what the // new catalog version will be. int64_t new_catalog_version = catalog_update_info_.catalog_version; + uint64_t batch_size_bytes = 0; for (const TTopicItem& item: delta.topic_entries) { uint32_t len = item.value.size(); TCatalogObject catalog_object; @@ -1257,12 +1263,20 @@ void ImpalaServer::CatalogUpdateCallback( LOG(ERROR) << "Error deserializing item: " << status.GetDetail(); continue; } + if (len > 100 * 1024 * 1024 /* 100MB */) { + LOG(INFO) << "Received large catalog update(>100mb): " + << item.key << " is " + << PrettyPrinter::Print(len, TUnit::BYTES); + } if (catalog_object.type == TCatalogObjectType::CATALOG) { - update_req.__set_catalog_service_id(catalog_object.catalog.catalog_service_id); + incremental_request->__set_catalog_service_id( + catalog_object.catalog.catalog_service_id); new_catalog_version = catalog_object.catalog_version; } // Refresh the lib cache entries of any added functions and data sources + // TODO: if frontend returns the list of functions and data sources, we do not + // need to deserialize these in backend. if (catalog_object.type == TCatalogObjectType::FUNCTION) { DCHECK(catalog_object.__isset.fn); LibCache::instance()->SetNeedsRefresh(catalog_object.fn.hdfs_location); @@ -1272,8 +1286,16 @@ void ImpalaServer::CatalogUpdateCallback( LibCache::instance()->SetNeedsRefresh(catalog_object.data_source.hdfs_location); } - update_req.updated_objects.push_back(catalog_object); + if (batch_size_bytes + len > MAX_CATALOG_UPDATE_BATCH_SIZE_BYTES) { + update_reqs.push_back(TUpdateCatalogCacheRequest()); + incremental_request = &update_reqs.back(); + batch_size_bytes = 0; + } + incremental_request->updated_objects.push_back(catalog_object); + batch_size_bytes += len; } + update_reqs.push_back(TUpdateCatalogCacheRequest()); + TUpdateCatalogCacheRequest* deletion_request = &update_reqs.back(); // We need to look up the dropped functions and data sources and remove them // from the library cache. The data sent from the catalog service does not @@ -1292,7 +1314,7 @@ void ImpalaServer::CatalogUpdateCallback( << "Error: " << status.GetDetail(); continue; } - update_req.removed_objects.push_back(catalog_object); + deletion_request->removed_objects.push_back(catalog_object); if (catalog_object.type == TCatalogObjectType::FUNCTION || catalog_object.type == TCatalogObjectType::DATA_SOURCE) { TCatalogObject dropped_object; @@ -1314,7 +1336,7 @@ void ImpalaServer::CatalogUpdateCallback( // Call the FE to apply the changes to the Impalad Catalog. TUpdateCatalogCacheResponse resp; - Status s = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp); + Status s = exec_env_->frontend()->UpdateCatalogCache(update_reqs, &resp); if (!s.ok()) { LOG(ERROR) << "There was an error processing the impalad catalog update. Requesting" << " a full topic update to recover: " << s.GetDetail(); @@ -1409,7 +1431,8 @@ Status ImpalaServer::ProcessCatalogUpdateResult( // Apply the changes to the local catalog cache. TUpdateCatalogCacheResponse resp; - Status status = exec_env_->frontend()->UpdateCatalogCache(update_req, &resp); + Status status = exec_env_->frontend()->UpdateCatalogCache( + vector<TUpdateCatalogCacheRequest>{update_req}, &resp); if (!status.ok()) LOG(ERROR) << status.GetDetail(); RETURN_IF_ERROR(status); if (!wait_for_all_subscribers) return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f95fe23/be/src/util/jni-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index ac14f84..dc55f78 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -265,6 +265,30 @@ class JniUtil { return Status::OK(); } + template <typename T, typename R> + static Status CallJniMethod(const jobject& obj, const jmethodID& method, + const vector<T>& args, R* response) { + JNIEnv* jni_env = getJNIEnv(); + JniLocalFrame jni_frame; + RETURN_IF_ERROR(jni_frame.push(jni_env)); + jclass jByteArray_class = jni_env->FindClass("[B"); + jobjectArray array_of_jByteArray = + jni_env->NewObjectArray(args.size(), jByteArray_class, NULL); + RETURN_ERROR_IF_EXC(jni_env); + jbyteArray request_bytes; + for (int i = 0; i < args.size(); i++) { + RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &args[i], &request_bytes)); + jni_env->SetObjectArrayElement(array_of_jByteArray, i, request_bytes); + RETURN_ERROR_IF_EXC(jni_env); + jni_env->DeleteLocalRef(request_bytes); + } + jbyteArray result_bytes = static_cast<jbyteArray>( + jni_env->CallObjectMethod(obj, method, array_of_jByteArray)); + RETURN_ERROR_IF_EXC(jni_env); + RETURN_IF_ERROR(DeserializeThriftMsg(jni_env, result_bytes, response)); + return Status::OK(); + } + template <typename T> static Status CallJniMethod(const jobject& obj, const jmethodID& method, const T& arg, std::string* response) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7f95fe23/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java b/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java index b5fcbf3..2256900 100644 --- a/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java +++ b/fe/src/main/java/com/cloudera/impala/service/JniFrontend.java @@ -16,6 +16,7 @@ package com.cloudera.impala.service; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.List; @@ -80,6 +81,7 @@ import com.cloudera.impala.thrift.TShowRolesParams; import com.cloudera.impala.thrift.TShowRolesResult; import com.cloudera.impala.thrift.TShowStatsParams; import com.cloudera.impala.thrift.TTableName; +import com.cloudera.impala.thrift.TUniqueId; import com.cloudera.impala.thrift.TUpdateCatalogCacheRequest; import com.cloudera.impala.thrift.TUpdateMembershipRequest; import com.cloudera.impala.util.GlogAppender; @@ -153,12 +155,28 @@ public class JniFrontend { } } - public byte[] updateCatalogCache(byte[] thriftCatalogUpdate) throws ImpalaException { - TUpdateCatalogCacheRequest req = new TUpdateCatalogCacheRequest(); - JniUtil.deserializeThrift(protocolFactory_, req, thriftCatalogUpdate); + // Deserialize and merge each thrift catalog update into a single merged update + public byte[] updateCatalogCache(byte[][] thriftCatalogUpdates) throws ImpalaException { + TUniqueId defaultCatalogServiceId = new TUniqueId(0L, 0L); + TUpdateCatalogCacheRequest mergedUpdateRequest = new TUpdateCatalogCacheRequest( + false, defaultCatalogServiceId, new ArrayList<TCatalogObject>(), + new ArrayList<TCatalogObject>()); + for (byte[] catalogUpdate: thriftCatalogUpdates) { + TUpdateCatalogCacheRequest incrementalRequest = new TUpdateCatalogCacheRequest(); + JniUtil.deserializeThrift(protocolFactory_, incrementalRequest, catalogUpdate); + mergedUpdateRequest.is_delta |= incrementalRequest.is_delta; + if (!incrementalRequest.getCatalog_service_id().equals(defaultCatalogServiceId)) { + mergedUpdateRequest.setCatalog_service_id( + incrementalRequest.getCatalog_service_id()); + } + mergedUpdateRequest.getUpdated_objects().addAll( + incrementalRequest.getUpdated_objects()); + mergedUpdateRequest.getRemoved_objects().addAll( + incrementalRequest.getRemoved_objects()); + } TSerializer serializer = new TSerializer(protocolFactory_); try { - return serializer.serialize(frontend_.updateCatalogCache(req)); + return serializer.serialize(frontend_.updateCatalogCache(mergedUpdateRequest)); } catch (TException e) { throw new InternalException(e.getMessage()); }
