[ 
https://issues.apache.org/jira/browse/IMPALA-10236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shant Hovsepian updated IMPALA-10236:
-------------------------------------
    Description: 
If a to be compressed Catalog Object doesn't fit into a 2GB buffer, an error is 
thrown. 

 
{code:java}
/// Compresses a serialized catalog object using LZ4 and stores it back in 
'dst'. Stores
/// the size of the uncompressed catalog object in the first sizeof(uint32_t) 
bytes of
/// 'dst'. The compression fails if the uncompressed data size exceeds 
0x7E000000 bytes.
Status CompressCatalogObject(const uint8_t* src, uint32_t size, std::string* 
dst)
    WARN_UNUSED_RESULT;

{code}
 

CatalogServer::AddPendingTopicItem() calls CompressCatalogObject()

 
{code:java}
// Add a catalog update to pending_topic_updates_.
extern "C"
JNIEXPORT jboolean JNICALL
Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env,
    jclass caller_class, jlong native_catalog_server_ptr, jstring key, jlong 
version,
    jbyteArray serialized_object, jboolean deleted) {
  std::string key_string;
  {
    JniUtfCharGuard key_str;
    if (!JniUtfCharGuard::create(env, key, &key_str).ok()) {
      return static_cast<jboolean>(false);
    }
    key_string.assign(key_str.get());
  }
  JniScopedArrayCritical obj_buf;
  if (!JniScopedArrayCritical::Create(env, serialized_object, &obj_buf)) {
    return static_cast<jboolean>(false);
  }
  reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->
      AddPendingTopicItem(std::move(key_string), version, obj_buf.get(),
      static_cast<uint32_t>(obj_buf.size()), deleted);
  return static_cast<jboolean>(true);
}

{code}
However the JNI call to AddPendingTopicItem discards the return value.

Recently the return value was maintained due to IMPALA-10076:
{code:java}
-        if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v1Key,
-            obj.catalog_version, data, delete)) {
+        int actualSize = 
FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
+            v1Key, obj.catalog_version, data, delete);
+        if (actualSize < 0) {
           LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + 
", delete="
               + delete + ", data_size=" + data.length);
+        } else if (summary != null && obj.type == HDFS_PARTITION) {
+          summary.update(true, delete, obj.hdfs_partition.partition_name,
+              obj.catalog_version, data.length, actualSize);
         }
       }
{code}
CatalogServiceCatalog::addCatalogObject() now produces an error message but the 
Catalog update doesn't go through.
{code:java}
      if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) {
        String v1Key = CatalogServiceConstants.CATALOG_TOPIC_V1_PREFIX + key;
        byte[] data = serializer.serialize(obj);
        int actualSize = 
FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
            v1Key, obj.catalog_version, data, delete);
        if (actualSize < 0) {
          LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + ", 
delete="
              + delete + ", data_size=" + data.length);
        } else if (summary != null && obj.type == HDFS_PARTITION) {
          summary.update(true, delete, obj.hdfs_partition.partition_name,
              obj.catalog_version, data.length, actualSize);
        }
      }

{code}
Not sure what the right behavior would be, we could handle the compression 
issue and try more aggressive compression, or unblock the catalog update.

 

  was:
If a compressed Catalog Object doesn't fit into a 2GB buffer, an error is 
thrown. 

 
{code:java}
/// Compresses a serialized catalog object using LZ4 and stores it back in 
'dst'. Stores
/// the size of the uncompressed catalog object in the first sizeof(uint32_t) 
bytes of
/// 'dst'. The compression fails if the uncompressed data size exceeds 
0x7E000000 bytes.
Status CompressCatalogObject(const uint8_t* src, uint32_t size, std::string* 
dst)
    WARN_UNUSED_RESULT;

{code}
 

CatalogServer::AddPendingTopicItem() calls CompressCatalogObject()

 
{code:java}
// Add a catalog update to pending_topic_updates_.
extern "C"
JNIEXPORT jboolean JNICALL
Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env,
    jclass caller_class, jlong native_catalog_server_ptr, jstring key, jlong 
version,
    jbyteArray serialized_object, jboolean deleted) {
  std::string key_string;
  {
    JniUtfCharGuard key_str;
    if (!JniUtfCharGuard::create(env, key, &key_str).ok()) {
      return static_cast<jboolean>(false);
    }
    key_string.assign(key_str.get());
  }
  JniScopedArrayCritical obj_buf;
  if (!JniScopedArrayCritical::Create(env, serialized_object, &obj_buf)) {
    return static_cast<jboolean>(false);
  }
  reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->
      AddPendingTopicItem(std::move(key_string), version, obj_buf.get(),
      static_cast<uint32_t>(obj_buf.size()), deleted);
  return static_cast<jboolean>(true);
}

{code}
However the JNI call to AddPendingTopicItem disregards the return value.

Recently the return value was maintained due to IMPALA-10076:
{code:java}
-        if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v1Key,
-            obj.catalog_version, data, delete)) {
+        int actualSize = 
FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
+            v1Key, obj.catalog_version, data, delete);
+        if (actualSize < 0) {
           LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + 
", delete="
               + delete + ", data_size=" + data.length);
+        } else if (summary != null && obj.type == HDFS_PARTITION) {
+          summary.update(true, delete, obj.hdfs_partition.partition_name,
+              obj.catalog_version, data.length, actualSize);
         }
       }
{code}
CatalogServiceCatalog::addCatalogObject() produces an error message but the 
Catalog update doesn't go through.
{code:java}
      if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) {
        String v1Key = CatalogServiceConstants.CATALOG_TOPIC_V1_PREFIX + key;
        byte[] data = serializer.serialize(obj);
        int actualSize = 
FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
            v1Key, obj.catalog_version, data, delete);
        if (actualSize < 0) {
          LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + ", 
delete="
              + delete + ", data_size=" + data.length);
        } else if (summary != null && obj.type == HDFS_PARTITION) {
          summary.update(true, delete, obj.hdfs_partition.partition_name,
              obj.catalog_version, data.length, actualSize);
        }
      }

{code}
Not sure what the right behavior would be, we could handle the compression 
issue and try more aggressive compression, or unblock the catalog update.

 


> Queries stuck if catalog topic update compression fails
> -------------------------------------------------------
>
>                 Key: IMPALA-10236
>                 URL: https://issues.apache.org/jira/browse/IMPALA-10236
>             Project: IMPALA
>          Issue Type: Bug
>          Components: Catalog
>    Affects Versions: Impala 2.12.0
>            Reporter: Shant Hovsepian
>            Priority: Critical
>              Labels: hang, supportability
>
> If a to be compressed Catalog Object doesn't fit into a 2GB buffer, an error 
> is thrown. 
>  
> {code:java}
> /// Compresses a serialized catalog object using LZ4 and stores it back in 
> 'dst'. Stores
> /// the size of the uncompressed catalog object in the first sizeof(uint32_t) 
> bytes of
> /// 'dst'. The compression fails if the uncompressed data size exceeds 
> 0x7E000000 bytes.
> Status CompressCatalogObject(const uint8_t* src, uint32_t size, std::string* 
> dst)
>     WARN_UNUSED_RESULT;
> {code}
>  
> CatalogServer::AddPendingTopicItem() calls CompressCatalogObject()
>  
> {code:java}
> // Add a catalog update to pending_topic_updates_.
> extern "C"
> JNIEXPORT jboolean JNICALL
> Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* 
> env,
>     jclass caller_class, jlong native_catalog_server_ptr, jstring key, jlong 
> version,
>     jbyteArray serialized_object, jboolean deleted) {
>   std::string key_string;
>   {
>     JniUtfCharGuard key_str;
>     if (!JniUtfCharGuard::create(env, key, &key_str).ok()) {
>       return static_cast<jboolean>(false);
>     }
>     key_string.assign(key_str.get());
>   }
>   JniScopedArrayCritical obj_buf;
>   if (!JniScopedArrayCritical::Create(env, serialized_object, &obj_buf)) {
>     return static_cast<jboolean>(false);
>   }
>   reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->
>       AddPendingTopicItem(std::move(key_string), version, obj_buf.get(),
>       static_cast<uint32_t>(obj_buf.size()), deleted);
>   return static_cast<jboolean>(true);
> }
> {code}
> However the JNI call to AddPendingTopicItem discards the return value.
> Recently the return value was maintained due to IMPALA-10076:
> {code:java}
> -        if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, 
> v1Key,
> -            obj.catalog_version, data, delete)) {
> +        int actualSize = 
> FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
> +            v1Key, obj.catalog_version, data, delete);
> +        if (actualSize < 0) {
>            LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + 
> ", delete="
>                + delete + ", data_size=" + data.length);
> +        } else if (summary != null && obj.type == HDFS_PARTITION) {
> +          summary.update(true, delete, obj.hdfs_partition.partition_name,
> +              obj.catalog_version, data.length, actualSize);
>          }
>        }
> {code}
> CatalogServiceCatalog::addCatalogObject() now produces an error message but 
> the Catalog update doesn't go through.
> {code:java}
>       if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) {
>         String v1Key = CatalogServiceConstants.CATALOG_TOPIC_V1_PREFIX + key;
>         byte[] data = serializer.serialize(obj);
>         int actualSize = 
> FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
>             v1Key, obj.catalog_version, data, delete);
>         if (actualSize < 0) {
>           LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + 
> ", delete="
>               + delete + ", data_size=" + data.length);
>         } else if (summary != null && obj.type == HDFS_PARTITION) {
>           summary.update(true, delete, obj.hdfs_partition.partition_name,
>               obj.catalog_version, data.length, actualSize);
>         }
>       }
> {code}
> Not sure what the right behavior would be, we could handle the compression 
> issue and try more aggressive compression, or unblock the catalog update.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to