This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 77d80aeda IMPALA-11812: Deduplicate column schema in hmsPartitions
77d80aeda is described below

commit 77d80aeda653b3aecb8bc41bf867cc5a84ba1245
Author: stiga-huang <[email protected]>
AuthorDate: Tue Dec 27 12:24:44 2022 +0800

    IMPALA-11812: Deduplicate column schema in hmsPartitions
    
    A list of HMS Partitions will be created in many workloads in catalogd,
    e.g. table loading, bulk altering partitions by ComputeStats or
    AlterTableRecoverPartitions, etc. Currently, each of hmsPartition hold a
    unique list of column schema, i.e. a List<FieldSchema>. This results in
    lots of FieldSchema instances if the table is wide and lots of
    partitions need to be loaded/operated. Though the strings of column
    names and comments are interned, the FieldSchema objects could still
    occupy the majority of the heap. See the histogram in JIRA description.
    
    In reality, the hmsPartition instances of a table can share the
    table-level column schema since Impala doesn't respect the partition
    level schema.
    
    This patch replaces column list in StorageDescriptor of hmsPartitions
    with the table level column list to remove the duplications. Also add
    some progress logs in batch HMS operations, and avoid misleading logs
    when event-processor is disabled.
    
    Tests:
    - Ran exhaustive tests
    - Add tests on wide table operations that hit OOM errors without this
      fix.
    
    Change-Id: I511ecca0ace8bea4c24a19a54fb0a75390e50c4d
    Reviewed-on: http://gerrit.cloudera.org:8080/19391
    Reviewed-by: Aman Sinha <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  | 13 ++--
 .../impala/catalog/events/MetastoreEvents.java     |  2 +
 .../impala/catalog/local/DirectMetaProvider.java   |  2 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 42 ++++++----
 .../java/org/apache/impala/util/MetaStoreUtil.java | 86 +++++++++++++++-----
 .../functional/functional_schema_template.sql      | 11 +++
 .../datasets/functional/schema_constraints.csv     |  3 +
 tests/common/custom_cluster_test_suite.py          |  7 +-
 tests/custom_cluster/test_wide_table_operations.py | 91 ++++++++++++++++++++++
 9 files changed, 212 insertions(+), 45 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 2ffdf8db0..99a34a877 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1269,7 +1269,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
           // Load all partitions from Hive Metastore, including file metadata.
           List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
               MetaStoreUtil.fetchAllPartitions(
-                  client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
+                  client, msTbl, NUM_PARTITION_FETCH_RETRIES);
           LOG.info("Fetched partition metadata from the Metastore: " + 
getFullName());
           storageMetadataLoadTime_ = loadAllPartitions(client, msPartitions, 
msTbl);
           allPartitionsLdContext.stop();
@@ -1574,11 +1574,11 @@ public class HdfsTable extends Table implements 
FeFsTable {
       if (partitionsToUpdate != null) {
         partitionList = MetaStoreUtil
             .fetchPartitionsByName(client, 
Lists.newArrayList(partitionsToUpdate),
-                db_.getName(), name_);
+                msTable_);
       } else {
         partitionList =
             MetaStoreUtil.fetchAllPartitions(
-                client_, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
+                client_, msTable_, NUM_PARTITION_FETCH_RETRIES);
       }
       LOG.debug("Time taken to fetch all partitions of table {}: {} msec", 
getFullName(),
           sw.stop().elapsed(TimeUnit.MILLISECONDS));
@@ -1838,6 +1838,8 @@ public class HdfsTable extends Table implements FeFsTable 
{
     }
     addVirtualColumns();
     isSchemaLoaded_ = true;
+    LOG.info("Loaded {} columns from HMS. Actual columns: {}",
+        nonPartFieldSchemas_.size() + numClusteringCols_, colsByPos_.size());
   }
 
   /**
@@ -1858,7 +1860,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
     // Load partition metadata from Hive Metastore.
     List<Partition> msPartitions = new ArrayList<>(
         MetaStoreUtil.fetchPartitionsByName(
-            client, Lists.newArrayList(partitionNames), db_.getName(), name_));
+            client, Lists.newArrayList(partitionNames), msTable_));
     return loadPartitionsFromMetastore(msPartitions, inprogressPartBuilders,
         partitionToEventId, client);
   }
@@ -2756,8 +2758,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
     List<Partition> hmsPartitions;
     Map<Partition, HdfsPartition> hmsPartToHdfsPart = new HashMap<>();
     try {
-      hmsPartitions = client.getPartitionsByNames(getDb().getName(),
-          getName(), partNames);
+      hmsPartitions = MetaStoreUtil.fetchPartitionsByName(client, partNames, 
msTable_);
       for (Partition partition : hmsPartitions) {
         List<LiteralExpr> partExprs = 
getTypeCompatiblePartValues(partition.getValues());
         HdfsPartition hdfsPartition = getPartition(partExprs);
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 4bfb2c8a4..44d09150f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -74,6 +74,7 @@ import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.MetaStoreUtil;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 
@@ -1873,6 +1874,7 @@ public class MetastoreEvents {
         // it is possible that the added partitions is empty in certain cases. 
See
         // IMPALA-8847 for example
         msTbl_ = addPartitionMessage_.getTableObj();
+        MetaStoreUtil.replaceSchemaFromTable(addedPartitions_, msTbl_);
         partitionKeyVals_ = new ArrayList<>(addedPartitions_.size());
         for (Partition part : addedPartitions_) {
           partitionKeyVals_.add(getTPartitionSpecFromHmsPartition(msTbl_, 
part));
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 20ce56ada..90bdba126 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -236,7 +236,7 @@ class DirectMetaProvider implements MetaProvider {
     List<Partition> parts;
     try (MetaStoreClient c = msClientPool_.getClient()) {
       parts = MetaStoreUtil.fetchPartitionsByName(
-          c.getHiveClient(), partNames, tableImpl.dbName_, 
tableImpl.tableName_);
+          c.getHiveClient(), partNames, tableImpl.msTable_);
     }
 
     // HMS may return fewer partition objects than requested, and the
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ed896c42a..6f3fa24e9 100755
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -174,7 +174,6 @@ import 
org.apache.impala.thrift.TAlterTableSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableType;
 import org.apache.impala.thrift.TAlterTableUnSetTblPropertiesParams;
 import org.apache.impala.thrift.TAlterTableUpdateStatsParams;
-import org.apache.impala.thrift.TBucketInfo;
 import org.apache.impala.thrift.TBucketType;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
@@ -4127,7 +4126,8 @@ public class CatalogOpExecutor {
     if (allHmsPartitionsToAdd.isEmpty()) return null;
 
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      Map<String, Long> partitionToEventId = Maps.newHashMap();
+      Map<String, Long> partitionToEventId = 
catalog_.isEventProcessingActive() ?
+          Maps.newHashMap() : null;
       List<Partition> addedHmsPartitions = 
addHmsPartitionsInTransaction(msClient,
           tbl, allHmsPartitionsToAdd, partitionToEventId, ifNotExists);
       // Handle HDFS cache. This is done in a separate round bacause we have 
to apply
@@ -4142,7 +4142,7 @@ public class CatalogOpExecutor {
         List<Partition> difference = computeDifference(allHmsPartitionsToAdd,
             addedHmsPartitions);
         addedHmsPartitions.addAll(
-            getPartitionsFromHms(msTbl, msClient, tableName, difference));
+            getPartitionsFromHms(msTbl, msClient, difference));
       }
       addHdfsPartitions(msClient, tbl, addedHmsPartitions, partitionToEventId);
     }
@@ -4720,16 +4720,21 @@ public class CatalogOpExecutor {
    */
   private List<Partition> addHmsPartitions(MetaStoreClient msClient,
       Table tbl, List<Partition> allHmsPartitionsToAdd,
-      Map<String, Long> partitionToEventId, boolean ifNotExists)
+      @Nullable Map<String, Long> partitionToEventId, boolean ifNotExists)
       throws ImpalaRuntimeException, CatalogException {
     long eventId = getCurrentEventId(msClient);
     List<Partition> addedHmsPartitions = Lists
         .newArrayListWithCapacity(allHmsPartitionsToAdd.size());
+    long numDone = 0;
     for (List<Partition> hmsSublist :
         Lists.partition(allHmsPartitionsToAdd, MAX_PARTITION_UPDATES_PER_RPC)) 
{
       try {
-        List<Partition> addedPartitions = msClient.getHiveClient()
-            .add_partitions(hmsSublist, ifNotExists, true);
+        List<Partition> addedPartitions = MetaStoreUtil.addPartitions(
+            msClient.getHiveClient(), tbl.getMetaStoreTable(),
+            hmsSublist, ifNotExists, true);
+        numDone += hmsSublist.size();
+        LOG.info("Added {}/{} partitions in HMS for table {}", numDone,
+            allHmsPartitionsToAdd.size(), tbl.getFullName());
         org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable();
         List<NotificationEvent> events = 
getNextMetastoreEventsIfEnabled(eventId,
                 event -> AddPartitionEvent.ADD_PARTITION_EVENT_TYPE
@@ -4748,6 +4753,7 @@ public class CatalogOpExecutor {
           // add_partitions call above.
           addedHmsPartitions.addAll(addedPartitions);
         } else {
+          Preconditions.checkNotNull(partitionToEventId);
           addedHmsPartitions.addAll(partitionToEventSubMap.keySet());
           // we cannot keep a mapping of Partition to event ids because the
           // partition objects are changed later in the cachePartitions code 
path.
@@ -4815,8 +4821,7 @@ public class CatalogOpExecutor {
    */
   private List<Partition> getPartitionsFromHms(
       org.apache.hadoop.hive.metastore.api.Table msTbl, MetaStoreClient 
msClient,
-      TableName tableName, List<Partition> hmsPartitions)
-      throws ImpalaException {
+      List<Partition> hmsPartitions) throws ImpalaException {
     List<String> partitionCols = Lists.newArrayList();
     for (FieldSchema fs: msTbl.getPartitionKeys()) 
partitionCols.add(fs.getName());
 
@@ -4827,8 +4832,8 @@ public class CatalogOpExecutor {
       partitionNames.add(partName);
     }
     try {
-      return msClient.getHiveClient().getPartitionsByNames(tableName.getDb(),
-          tableName.getTbl(), partitionNames);
+      return MetaStoreUtil.fetchPartitionsByName(msClient.getHiveClient(),
+          partitionNames, msTbl);
     } catch (TException e) {
       throw new ImpalaRuntimeException("Metadata inconsistency has occured. 
Please run "
           + "'invalidate metadata <tablename>' to resolve the problem.", e);
@@ -5693,7 +5698,8 @@ public class CatalogOpExecutor {
     }
 
     // Add partitions to metastore.
-    Map<String, Long> partitionToEventId = Maps.newHashMap();
+    Map<String, Long> partitionToEventId = catalog_.isEventProcessingActive() ?
+        Maps.newHashMap() : null;
     String annotation = String.format("Recovering %d partitions for %s",
         hmsPartitions.size(), tbl.getFullName());
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation);
@@ -5703,6 +5709,7 @@ public class CatalogOpExecutor {
       addHdfsPartitions(msClient, tbl, addedPartitions, partitionToEventId);
       // Handle HDFS cache.
       if (cachePoolName != null) {
+        int numDone = 0;
         for (List<Partition> hmsSublist :
             Lists.partition(addedPartitions, MAX_PARTITION_UPDATES_PER_RPC)) {
           for (Partition partition: hmsSublist) {
@@ -5713,6 +5720,9 @@ public class CatalogOpExecutor {
           // Update the partition metadata to include the cache directive id.
           MetastoreShim.alterPartitions(msClient.getHiveClient(), 
tableName.getDb(),
               tableName.getTbl(), hmsSublist);
+          numDone += hmsSublist.size();
+          LOG.info("Updated cache directive id for {}/{} partitions for table 
{}",
+              numDone, addedPartitions.size(), tableName);
         }
       }
     } catch (TException e) {
@@ -5784,7 +5794,7 @@ public class CatalogOpExecutor {
     partition.setDbName(tableName.getDb());
     partition.setTableName(tableName.getTbl());
     partition.setValues(partitionSpecValues);
-    StorageDescriptor sd = msTbl.getSd().deepCopy();
+    StorageDescriptor sd = 
MetaStoreUtil.shallowCopyStorageDescriptor(msTbl.getSd());
     sd.setLocation(location);
     partition.setSd(sd);
     return partition;
@@ -6131,6 +6141,7 @@ public class CatalogOpExecutor {
 
     String dbName = tbl.getDb().getName();
     String tableName = tbl.getName();
+    int numDone = 0;
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'.
       for (List<Partition> msPartitionsSubList : Iterables.partition(
@@ -6144,6 +6155,9 @@ public class CatalogOpExecutor {
             MetastoreShim.alterPartitions(msClient.getHiveClient(), dbName, 
tableName,
                 msPartitionsSubList);
           }
+          numDone += msPartitionsSubList.size();
+          LOG.info("HMS alterPartitions done on {}/{} partitions of table {}", 
numDone,
+              msPartitionToBuilders.size(), tbl.getFullName());
           // Mark the corresponding HdfsPartition objects as dirty
           for (Partition msPartition : msPartitionsSubList) {
             HdfsPartition.Builder partBuilder = 
msPartitionToBuilders.get(msPartition);
@@ -6497,9 +6511,7 @@ public class CatalogOpExecutor {
               partition.setDbName(tblName.getDb());
               partition.setTableName(tblName.getTbl());
               partition.setValues(MetaStoreUtil.getPartValsFromName(msTbl, 
partName));
-              partition.setParameters(new HashMap<String, String>());
-              partition.setSd(msTbl.getSd().deepCopy());
-              
partition.getSd().setSerdeInfo(msTbl.getSd().getSerdeInfo().deepCopy());
+              
partition.setSd(MetaStoreUtil.shallowCopyStorageDescriptor(msTbl.getSd()));
               partition.getSd().setLocation(msTbl.getSd().getLocation() + "/" 
+ partName);
               addCatalogServiceIdentifiers(msTbl, partition);
               MetastoreShim.updatePartitionStatsFast(partition, msTbl, 
warehouse);
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java 
b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index 6cd6ada23..0e667b4b4 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -17,16 +17,11 @@
 
 package org.apache.impala.util;
 
-import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import java.util.Collection;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -35,7 +30,6 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.impala.catalog.CatalogException;
@@ -156,16 +150,17 @@ public class MetaStoreUtil {
    * generally mean the connection is broken or has timed out. The HiveClient 
supports
    * configuring retires at the connection level so it can be enabled 
independently.
    */
-  public static List<org.apache.hadoop.hive.metastore.api.Partition> 
fetchAllPartitions(
-      IMetaStoreClient client, String dbName, String tblName, int numRetries)
-      throws MetaException, TException {
+  public static List<Partition> fetchAllPartitions(IMetaStoreClient client, 
Table msTbl,
+      int numRetries) throws TException {
+    String dbName = msTbl.getDbName();
+    String tblName = msTbl.getTableName();
     Preconditions.checkArgument(numRetries >= 0);
     int retryAttempt = 0;
     while (true) {
       try {
         // First, get all partition names that currently exist.
         List<String> partNames = client.listPartitionNames(dbName, tblName, 
(short) -1);
-        return MetaStoreUtil.fetchPartitionsByName(client, partNames, dbName, 
tblName);
+        return MetaStoreUtil.fetchPartitionsByName(client, partNames, msTbl);
       } catch (MetaException e) {
         // Only retry for MetaExceptions, since TExceptions could indicate a 
broken
         // connection which we can't recover from by retrying.
@@ -184,31 +179,80 @@ public class MetaStoreUtil {
   /**
    * Given a List of partition names, fetches the matching Partitions from the 
HMS
    * in batches. Each batch will contain at most 'maxPartsPerRpc' partitions.
-   * Returns a List containing all fetched Partitions.
-   * Will throw a MetaException if any partitions in 'partNames' do not exist.
+   * The partition-level schema will be replaced with the table's to reduce 
memory
+   * footprint (IMPALA-11812).
+   * @return a List containing all fetched Partitions.
+   * @throws MetaException if any partitions in 'partNames' do not exist.
+   * @throws TException for RPC failures.
    */
-  public static List<Partition> fetchPartitionsByName(
-      IMetaStoreClient client, List<String> partNames, String dbName, String 
tblName)
-      throws MetaException, TException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace(String.format("Fetching %d partitions for: %s.%s using 
partition " +
-          "batch size: %d", partNames.size(), dbName, tblName, 
maxPartitionsPerRpc_));
-    }
+  public static List<Partition> fetchPartitionsByName(IMetaStoreClient client,
+      List<String> partNames, Table msTbl) throws TException {
+    LOG.info("Fetching {} partitions for: {}.{} using partition batch size: 
{}",
+        partNames.size(), msTbl.getDbName(), msTbl.getTableName(),
+        maxPartitionsPerRpc_);
 
     List<org.apache.hadoop.hive.metastore.api.Partition> fetchedPartitions =
         Lists.newArrayList();
     // Fetch the partitions in batches.
+    int numDone = 0;
     for (int i = 0; i < partNames.size(); i += maxPartitionsPerRpc_) {
       // Get a subset of partition names to fetch.
       List<String> partsToFetch =
           partNames.subList(i, Math.min(i + maxPartitionsPerRpc_, 
partNames.size()));
       // Fetch these partitions from the metastore.
-      fetchedPartitions.addAll(
-          client.getPartitionsByNames(dbName, tblName, partsToFetch));
+      List<Partition> partitions = client.getPartitionsByNames(
+          msTbl.getDbName(), msTbl.getTableName(), partsToFetch);
+      replaceSchemaFromTable(partitions, msTbl);
+      fetchedPartitions.addAll(partitions);
+      numDone += partitions.size();
+      LOG.info("Fetched {}/{} partitions for table {}.{}", numDone, 
partNames.size(),
+          msTbl.getDbName(), msTbl.getTableName());
     }
     return fetchedPartitions;
   }
 
+  /**
+   * A wrapper for HMS add_partitions API to replace the partition-level 
schema with
+   * table's to save memory.
+   * @return a List containing all added Partitions.
+   */
+  public static List<Partition> addPartitions(IMetaStoreClient client, Table 
tbl,
+      List<Partition> partitions, boolean ifNotExists, boolean needResults)
+      throws TException {
+    List<Partition> addedPartitions = client.add_partitions(partitions,
+        ifNotExists, needResults);
+    replaceSchemaFromTable(addedPartitions, tbl);
+    return addedPartitions;
+  }
+
+  /**
+   * Replace the column list in the given partitions with the table level 
schema to save
+   * memory for wide tables (IMPALA-11812). Note that Impala never use the 
partition
+   * level schema.
+   */
+  public static void replaceSchemaFromTable(List<Partition> partitions, Table 
msTbl) {
+    for (Partition p : partitions) {
+      p.getSd().setCols(msTbl.getSd().getCols());
+    }
+  }
+
+  /**
+   * Shallow copy the given StorageDescriptor.
+   */
+  public static StorageDescriptor 
shallowCopyStorageDescriptor(StorageDescriptor other) {
+    return new StorageDescriptor(
+        other.getCols(),
+        other.getLocation(),
+        other.getInputFormat(),
+        other.getOutputFormat(),
+        other.isCompressed(),
+        other.getNumBuckets(),
+        other.getSerdeInfo(),
+        other.getBucketCols(),
+        other.getSortCols(),
+        other.getParameters());
+  }
+
   /**
    * Checks that a given 'property' is short enough for HMS to handle. If not, 
throws an
    * 'AnalysisException' with 'name' as its prefix.
diff --git a/testdata/datasets/functional/functional_schema_template.sql 
b/testdata/datasets/functional/functional_schema_template.sql
index 08cb1a4aa..8ac045275 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2388,6 +2388,17 @@ select * from functional.{table_name};
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+widetable_2000_cols_partitioned
+---- PARTITION_COLUMNS
+p int
+---- COLUMNS
+`${IMPALA_HOME}/testdata/common/widetable.py --get_columns -n 2000
+---- ROW_FORMAT
+delimited fields terminated by ',' escaped by '\\'
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 avro_decimal_tbl
 ---- COLUMNS
 name STRING
diff --git a/testdata/datasets/functional/schema_constraints.csv 
b/testdata/datasets/functional/schema_constraints.csv
index 45d770e85..ad5abee48 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -361,3 +361,6 @@ table_name:alltypestiny_negative, constraint:restrict_to, 
table_format:orc/def/b
 
 table_name:insert_only_minor_compacted, constraint:restrict_to, 
table_format:parquet/none/none
 table_name:insert_only_major_and_minor_compacted, constraint:restrict_to, 
table_format:parquet/none/none
+
+# The table is used in large scale metadata test. File format doesn't matter 
so restrict to text only
+table_name:widetable_2000_cols_partitioned, constraint:restrict_to, 
table_format:text/none/none
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index 6e8205532..65a2fe8d5 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -43,6 +43,7 @@ ADMISSIOND_ARGS = 'admissiond_args'
 KUDU_ARGS = 'kudu_args'
 # Additional args passed to the start-impala-cluster script.
 START_ARGS = 'start_args'
+JVM_ARGS = 'jvm_args'
 HIVE_CONF_DIR = 'hive_conf_dir'
 CLUSTER_SIZE = "cluster_size"
 # Default query options passed to the impala daemon command line. Handled 
separately from
@@ -105,7 +106,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   @staticmethod
   def with_args(impalad_args=None, statestored_args=None, catalogd_args=None,
-      start_args=None, default_query_options=None,
+      start_args=None, default_query_options=None, jvm_args=None,
       impala_log_dir=None, hive_conf_dir=None, cluster_size=None,
       num_exclusive_coordinators=None, kudu_args=None, 
statestored_timeout_s=None,
       impalad_timeout_s=None, expect_cores=None, reset_ranger=False):
@@ -119,6 +120,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         func.func_dict[CATALOGD_ARGS] = catalogd_args
       if start_args is not None:
         func.func_dict[START_ARGS] = start_args.split()
+      if jvm_args is not None:
+        func.func_dict[JVM_ARGS] = jvm_args
       if hive_conf_dir is not None:
         func.func_dict[HIVE_CONF_DIR] = hive_conf_dir
       if kudu_args is not None:
@@ -144,7 +147,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   def setup_method(self, method):
     cluster_args = list()
-    for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS, 
ADMISSIOND_ARGS]:
+    for arg in [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS, 
ADMISSIOND_ARGS, JVM_ARGS]:
       if arg in method.func_dict:
         cluster_args.append("--%s=%s " % (arg, method.func_dict[arg]))
     if START_ARGS in method.func_dict:
diff --git a/tests/custom_cluster/test_wide_table_operations.py 
b/tests/custom_cluster/test_wide_table_operations.py
new file mode 100644
index 000000000..e1081aff1
--- /dev/null
+++ b/tests/custom_cluster/test_wide_table_operations.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pytest
+from subprocess import call
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIf
+
+TBL_NAME = "widetable_2000_cols_partitioned"
+NUM_PARTS = 50000
+
+
[email protected]_hdfs
+class TestWideTableOperations(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive since it takes more than 20 mins')
+    super(TestWideTableOperations, cls).setup_class()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      jvm_args="-Xmx2g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="
+               + os.getenv("LOG_DIR", "/tmp"))
+  def test_wide_table_operations(self, vector, unique_database):
+    """Regression test for IMPALA-11812. Test DDL/DML operations on wide table.
+    Use a small heap size (2GB) to make sure memory consumption is optimized.
+    Each FieldSchema instance takes 24 bytes in a small heap (<32GB). Without 
the fix,
+    catalogd will hold at least 50,000 (parts) * 2,000 (cols) = 100,000,000 
FieldSchema
+    instances in memory for execDdl or table loading, which already takes more 
than 2GB
+    and will results in OOM failures."""
+    # Create partition dirs and files locally
+    tmp_dir = "/tmp/" + TBL_NAME
+    os.mkdir(tmp_dir)
+    for i in range(NUM_PARTS):
+      part_dir = tmp_dir + "/p=" + str(i)
+      data_file = part_dir + "/data.txt"
+      os.mkdir(part_dir)
+      with open(data_file, 'w') as local_file:
+        local_file.write("true")
+    # Upload files to HDFS
+    hdfs_dir = self._get_table_location("functional." + TBL_NAME, vector)
+    call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", hdfs_dir])
+    # Use 1 replica to save space, 8 threads to speed up
+    call(["hdfs", "dfs", "-Ddfs.replication=1", "-put", "-t", "8", tmp_dir, 
hdfs_dir])
+    # Create a new table so we don't need to drop partitions at the end.
+    # It will be dropped when 'unique_database' is dropped.
+    create_tbl_ddl =\
+        "create external table {db}.{tbl} like functional.{tbl} " \
+        "location '{location}'".format(
+            db=unique_database, tbl=TBL_NAME, location=hdfs_dir)
+    self.execute_query_expect_success(
+        self.client, create_tbl_ddl.format(db=unique_database, tbl=TBL_NAME))
+
+    # Recover partitions first. This takes 10mins for 50k partitions.
+    recover_stmt = "alter table {db}.{tbl} recover partitions"
+    # Invalidate the table to test initial metadata loading
+    invalidate_stmt = "invalidate metadata {db}.{tbl}"
+    # Test initial table loading and get all partitions
+    show_parts_stmt = "show partitions {db}.{tbl}"
+    try:
+      self.execute_query_expect_success(
+          self.client, recover_stmt.format(db=unique_database, tbl=TBL_NAME))
+      self.execute_query_expect_success(
+          self.client, invalidate_stmt.format(db=unique_database, 
tbl=TBL_NAME))
+      res = self.execute_query_expect_success(
+          self.client, show_parts_stmt.format(db=unique_database, 
tbl=TBL_NAME))
+      # Last line is 'Total'
+      assert len(res.data) == NUM_PARTS + 1
+    finally:
+      call(["rm", "-rf", tmp_dir])
+      call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", hdfs_dir])

Reply via email to