Shant Hovsepian created IMPALA-10236:
----------------------------------------
Summary: Queries Stuck if catalog topic update compression fails
Key: IMPALA-10236
URL: https://issues.apache.org/jira/browse/IMPALA-10236
Project: IMPALA
Issue Type: Bug
Components: Catalog
Affects Versions: Impala 2.12.0
Reporter: Shant Hovsepian
If a compressed Catalog Object doesn't fit into a 2GB buffer, an error is
thrown.
{code:java}
/// Compresses a serialized catalog object using LZ4 and stores it back in
'dst'. Stores
/// the size of the uncompressed catalog object in the first sizeof(uint32_t)
bytes of
/// 'dst'. The compression fails if the uncompressed data size exceeds
0x7E000000 bytes.
Status CompressCatalogObject(const uint8_t* src, uint32_t size, std::string*
dst)
WARN_UNUSED_RESULT;
{code}
CatalogServer::AddPendingTopicItem() calls CompressCatalogObject()
{code:java}
// Add a catalog update to pending_topic_updates_.
extern "C"
JNIEXPORT jboolean JNICALL
Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env,
jclass caller_class, jlong native_catalog_server_ptr, jstring key, jlong
version,
jbyteArray serialized_object, jboolean deleted) {
std::string key_string;
{
JniUtfCharGuard key_str;
if (!JniUtfCharGuard::create(env, key, &key_str).ok()) {
return static_cast<jboolean>(false);
}
key_string.assign(key_str.get());
}
JniScopedArrayCritical obj_buf;
if (!JniScopedArrayCritical::Create(env, serialized_object, &obj_buf)) {
return static_cast<jboolean>(false);
}
reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->
AddPendingTopicItem(std::move(key_string), version, obj_buf.get(),
static_cast<uint32_t>(obj_buf.size()), deleted);
return static_cast<jboolean>(true);
}
{code}
However the JNI call to AddPendingTopicItem disregards the return value.
Recently the return value was maintained due to IMPALA-10076:
{code:java}
- if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v1Key,
- obj.catalog_version, data, delete)) {
+ int actualSize =
FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
+ v1Key, obj.catalog_version, data, delete);
+ if (actualSize < 0) {
LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key +
", delete="
+ delete + ", data_size=" + data.length);
+ } else if (summary != null && obj.type == HDFS_PARTITION) {
+ summary.update(true, delete, obj.hdfs_partition.partition_name,
+ obj.catalog_version, data.length, actualSize);
}
}
{code}
CatalogServiceCatalog::addCatalogObject() produces an error message but the
Catalog update doesn't go through.
{code:java}
if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) {
String v1Key = CatalogServiceConstants.CATALOG_TOPIC_V1_PREFIX + key;
byte[] data = serializer.serialize(obj);
int actualSize =
FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
v1Key, obj.catalog_version, data, delete);
if (actualSize < 0) {
LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + ",
delete="
+ delete + ", data_size=" + data.length);
} else if (summary != null && obj.type == HDFS_PARTITION) {
summary.update(true, delete, obj.hdfs_partition.partition_name,
obj.catalog_version, data.length, actualSize);
}
}
{code}
Not sure what the right behavior would be, we could handle the compression
issue and try more aggressive compression, or unblock the catalog update.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]