This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b37f4509f IMPALA-14089: Support REFRESH on multiple partitions
b37f4509f is described below

commit b37f4509fa03359be77bd7966e40cb2ffd1ec3e4
Author: stiga-huang <[email protected]>
AuthorDate: Thu May 22 18:33:29 2025 +0800

    IMPALA-14089: Support REFRESH on multiple partitions
    
    Currently we just support REFRESH on the whole table or a specific
    partition:
      REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, 
key_col2=val2...])]
    
    If users want to refresh multiple partitions, they have to submit
    multiple statements each for a single partition. This has some
    drawbacks:
     - It requires holding the table write lock inside catalogd multiple
       times, which increase lock contention with other read/write
       operations on the same table, e.g. getPartialCatalogObject requests
       from coordinators.
     - Catalog version of the table will be increased multiple times.
       Coordinators in local catalog mode is more likely to see different
       versions between their getPartialCatalogObject requests so have to
       retry planning to resolve InconsistentMetadataFetchException.
     - Partitions are reloaded in sequence. They should be reloaded in
       parallel like we do in refreshing the whole table.
    
    This patch extends the syntax to refresh multiple partitions in one
    statement:
      REFRESH [db_name.]table_name
      [PARTITION (key_col1=val1 [, key_col2=val2...])
       [PARTITION (key_col1=val3 [, key_col2=val4...])...]]
    Example:
      REFRESH foo PARTITION(p=0) PARTITION(p=1) PARTITION(p=2);
    
    TResetMetadataRequest is extended to have a list of partition specs for
    this. If the list has only one item, we still use the existing logic of
    reloading a specific partition. If the list has more than one item,
    partitions will be reloaded in parallel. This is implemented in
    CatalogServiceCatalog#reloadTable(). Previously it always invokes
    HdfsTable#load() with partitionsToUpdate=null. Now the parameter is
    set when TResetMetadataRequest has the partition list.
    
    HMS notification events in RELOAD type will be fired for each partition
    if enable_reload_events is turned on. Once HIVE-28967 is resolved, we
    can fire a single event for multiple partitions.
    
    Updated docs in impala_refresh.xml.
    
    Tests:
     - Added FE and e2e tests
    
    Change-Id: Ie5b0deeaf23129ed6e1ba2817f54291d7f63d04e
    Reviewed-on: http://gerrit.cloudera.org:8080/22938
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 common/thrift/CatalogService.thrift                |  9 ++-
 docs/impala_keydefs.ditamap                        |  1 +
 docs/topics/impala_refresh.xml                     | 16 +++++-
 .../org/apache/impala/compat/MetastoreShim.java    |  2 +-
 .../org/apache/impala/compat/MetastoreShim.java    | 37 ++++++++----
 fe/src/main/cup/sql-parser.cup                     | 20 ++++++-
 .../apache/impala/analysis/ResetMetadataStmt.java  | 43 ++++++++------
 .../impala/catalog/CatalogServiceCatalog.java      | 26 ++++++---
 .../java/org/apache/impala/catalog/HdfsTable.java  | 21 +++++--
 .../apache/impala/service/CatalogOpExecutor.java   | 55 +++++++++++-------
 .../java/org/apache/impala/util/CatalogOpUtil.java |  4 +-
 .../org/apache/impala/analysis/AnalyzerTest.java   | 16 +++++-
 .../org/apache/impala/analysis/ParserTest.java     |  4 ++
 .../impala/analysis/StmtMetadataLoaderTest.java    |  5 +-
 .../java/org/apache/impala/analysis/ToSqlTest.java |  2 +
 .../events/MetastoreEventsProcessorTest.java       |  2 +-
 .../org/apache/impala/service/FrontendTest.java    |  2 +
 .../org/apache/impala/util/CatalogOpUtilTest.java  |  4 ++
 tests/custom_cluster/test_events_custom_configs.py | 17 ++++--
 tests/metadata/test_refresh_partition.py           | 67 ++++++++++++++++++++--
 20 files changed, 268 insertions(+), 85 deletions(-)

diff --git a/common/thrift/CatalogService.thrift 
b/common/thrift/CatalogService.thrift
index 261fb7365..51379682f 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -325,8 +325,9 @@ struct TResetMetadataRequest {
   // the entire catalog
   4: optional CatalogObjects.TTableName table_name
 
-  // If set, refreshes the specified partition, otherwise
-  // refreshes the whole table
+  // Deprecated - use partition_spec_list instead. Keeps this for 
compatibility.
+  // If set, refreshes the specified partition.
+  // Refreshes the whole table if both this and partition_spec_list are not 
set.
   5: optional list<CatalogObjects.TPartitionKeyValue> partition_spec
 
   // If set, refreshes functions in the specified database.
@@ -344,6 +345,10 @@ struct TResetMetadataRequest {
 
   // debug_action is set from the query_option when available.
   10: optional string debug_action
+
+  // If set, refreshes the specified list of partitions
+  // Refreshes the whole table if both this and partition_spec are not set.
+  11: optional list<list<CatalogObjects.TPartitionKeyValue>> 
partition_spec_list
 }
 
 // Response from TResetMetadataRequest
diff --git a/docs/impala_keydefs.ditamap b/docs/impala_keydefs.ditamap
index af4c41682..46ec6b856 100644
--- a/docs/impala_keydefs.ditamap
+++ b/docs/impala_keydefs.ditamap
@@ -10604,6 +10604,7 @@ under the License.
   <keydef keys="impala132"><topicmeta><keywords><keyword>Impala 
1.3.2</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala130"><topicmeta><keywords><keyword>Impala 
1.3.0</keyword></keywords></topicmeta></keydef>
 
+  <keydef keys="impala50_full"><topicmeta><keywords><keyword>Impala 
5.0</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala42_full"><topicmeta><keywords><keyword>Impala 
4.2</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala34_full"><topicmeta><keywords><keyword>Impala 
3.4</keyword></keywords></topicmeta></keydef>
   <keydef keys="impala33_full"><topicmeta><keywords><keyword>Impala 
3.3</keyword></keywords></topicmeta></keydef>
diff --git a/docs/topics/impala_refresh.xml b/docs/topics/impala_refresh.xml
index 4e43b1e2d..dcc959044 100644
--- a/docs/topics/impala_refresh.xml
+++ b/docs/topics/impala_refresh.xml
@@ -67,7 +67,9 @@ under the License.
 
     <p conref="../shared/impala_common.xml#common/syntax_blurb"/>
 
-<codeblock rev="IMPALA-1683">REFRESH 
[<varname>db_name</varname>.]<varname>table_name</varname> [PARTITION 
(<varname>key_col1</varname>=<varname>val1</varname> [, 
<varname>key_col2</varname>=<varname>val2</varname>...])]</codeblock>
+<codeblock rev="IMPALA-1683">REFRESH 
[<varname>db_name</varname>.]<varname>table_name</varname>
+[PARTITION (<varname>key_col1</varname>=<varname>val1</varname> [, 
<varname>key_col2</varname>=<varname>val2</varname>...])
+  [PARTITION (<varname>key_col1</varname>=<varname>val3</varname> [, 
<varname>key_col2</varname>=<varname>val4</varname>...])...]</codeblock>
 
     <p conref="../shared/impala_common.xml#common/usage_notes_blurb"/>
 
@@ -115,7 +117,7 @@ under the License.
     <p conref="../shared/impala_common.xml#common/refresh_vs_invalidate"/>
 
     <p rev="IMPALA-1683">
-      <b>Refreshing a single partition:</b>
+      <b>Refreshing specific partitions:</b>
     </p>
 
     <p rev="IMPALA-1683">
@@ -125,6 +127,13 @@ under the License.
       values for each of the partition key columns.
     </p>
 
+    <p>
+      In <keyword keyref="impala50_full"/> and higher, the 
<codeph>REFRESH</codeph> statement
+      can apply to multiple partitions at a time, rather than a single 
partition. Use the
+      optional <codeph>PARTITION (<varname>partition_spec</varname>)</codeph> 
clause for each
+      each of the partition.
+    </p>
+
     <p>
       The following rules apply:
       <ul>
@@ -164,6 +173,9 @@ refresh p2 partition (z=1, y=0)
 -- Incomplete partition spec causes an error.
 refresh p2 partition (y=0)
 ERROR: AnalysisException: Items in partition spec must exactly match the 
partition columns in the table definition: default.p2 (1 vs 2)
+
+-- Refresh multiple partitions.
+refresh p2 partition (y=0, z=3) partition (y=1, z=0) partition (y=1, z=2);
 ]]>
 </codeblock>
 
diff --git 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index e6ac112e7..ac0a0657c 100644
--- 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -435,7 +435,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
    */
   @VisibleForTesting
   public static List<Long> fireReloadEventHelper(MetaStoreClient msClient,
-      boolean isRefresh, List<String> partVals, String dbName, String 
tableName,
+      boolean isRefresh, List<List<String>> partVals, String dbName, String 
tableName,
       Map<String, String> selfEventParams) throws TException {
     throw new UnsupportedOperationException("Reload event is not supported.");
   }
diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 78a0e4f85..f2c71fab8 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -524,15 +524,14 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
    * @param msClient Metastore client,
    * @param isRefresh if this flag is set to true then it is a refresh query, 
else it
    *                  is an invalidate metadata query.
-   * @param partVals The partition list corresponding to
-   *                                 the table, used by Apache Hive 3
+   * @param partValsList partition values (List<String>) for each partition
    * @param dbName
    * @param tableName
    * @return a list of eventIds for the reload events
    */
   @VisibleForTesting
   public static List<Long> fireReloadEventHelper(MetaStoreClient msClient,
-      boolean isRefresh, List<String> partVals, String dbName, String 
tableName,
+      boolean isRefresh, List<List<String>> partValsList, String dbName, 
String tableName,
       Map<String, String> selfEventParams) throws TException {
     Preconditions.checkNotNull(msClient);
     Preconditions.checkNotNull(dbName);
@@ -542,16 +541,32 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
     FireEventRequest rqst = new FireEventRequest(true, data);
     rqst.setDbName(dbName);
     rqst.setTableName(tableName);
-    rqst.setPartitionVals(partVals);
     rqst.setTblParams(selfEventParams);
-    FireEventResponse response = 
msClient.getHiveClient().fireListenerEvent(rqst);
-    if (!response.isSetEventIds()) {
-      LOG.error("FireEventResponse does not have event ids set for table 
{}.{}. This "
-              + "may cause the table to unnecessarily be refreshed when the " +
-              "refresh/invalidate event is received.", dbName, tableName);
-      return Collections.emptyList();
+    if (partValsList == null || partValsList.isEmpty()) {
+      FireEventResponse response = 
msClient.getHiveClient().fireListenerEvent(rqst);
+      if (!response.isSetEventIds()) {
+        LOG.error("FireEventResponse does not have event ids set for table 
{}.{}. This "
+            + "may cause the table to unnecessarily be refreshed when the " +
+            "refresh/invalidate event is received.", dbName, tableName);
+        return Collections.emptyList();
+      }
+      return response.getEventIds();
     }
-    return response.getEventIds();
+    List<Long> eventIds = new ArrayList<>();
+    // TODO: Fire one event once HIVE-28967 is resolved.
+    for (List<String> partVals : partValsList) {
+      rqst.setPartitionVals(partVals);
+      FireEventResponse response = 
msClient.getHiveClient().fireListenerEvent(rqst);
+      if (!response.isSetEventIds()) {
+        LOG.error("FireEventResponse does not have event ids set for table 
{}.{} " +
+            "partition {}. This may cause the table to unnecessarily be 
refreshed " +
+            "when the refresh/invalidate event is received.",
+            dbName, tableName, partVals);
+        continue;
+      }
+      eventIds.addAll(response.getEventIds());
+    }
+    return eventIds;
   }
 
   /**
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 553a1a599..16c197b6b 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -504,6 +504,8 @@ nonterminal PartitionSpec partition_spec;
 nonterminal PartitionSet opt_partition_set;
 // Required partition set
 nonterminal PartitionSet partition_set;
+// Optional partition spec list
+nonterminal List<PartitionSpec> partition_spec_list;
 nonterminal List<PartitionKeyValue> partition_clause;
 nonterminal List<PartitionKeyValue> static_partition_key_value_list;
 nonterminal List<PartitionKeyValue> partition_key_value_list;
@@ -845,8 +847,8 @@ reset_metadata_stmt ::=
   {: RESULT = ResetMetadataStmt.createInvalidateStmt(table); :}
   | KW_REFRESH table_name:table
   {: RESULT = ResetMetadataStmt.createRefreshTableStmt(table); :}
-  | KW_REFRESH table_name:table partition_spec:partition
-  {: RESULT = ResetMetadataStmt.createRefreshPartitionStmt(table, partition); 
:}
+  | KW_REFRESH table_name:table partition_spec_list:partitions
+  {: RESULT = ResetMetadataStmt.createRefreshPartitionsStmt(table, 
partitions); :}
   | KW_REFRESH KW_FUNCTIONS ident_or_unreserved:db
   {: RESULT = ResetMetadataStmt.createRefreshFunctionsStmt(db); :}
   | KW_REFRESH KW_AUTHORIZATION
@@ -2774,6 +2776,20 @@ opt_partition_spec ::=
   {: RESULT = null; :}
   ;
 
+partition_spec_list ::=
+  partition_spec:item
+  {:
+    List<PartitionSpec> list = new ArrayList<>();
+    list.add(item);
+    RESULT = list;
+  :}
+  | partition_spec_list:list partition_spec:item
+  {:
+    list.add(item);
+    RESULT = list;
+  :}
+  ;
+
 static_partition_key_value_list ::=
   static_partition_key_value:item
   {:
diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index d980ccba7..a75693f89 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -68,8 +68,8 @@ public class ResetMetadataStmt extends StatementBase 
implements SingleTableStmt
   // database functions.
   private TableName tableName_;
 
-  // not null when refreshing a single partition
-  private final PartitionSpec partitionSpec_;
+  // not null when refreshing specified partitions
+  private final List<PartitionSpec> partitionSpecList_;
 
   // not null when refreshing functions in a database.
   private final String database_;
@@ -87,13 +87,15 @@ public class ResetMetadataStmt extends StatementBase 
implements SingleTableStmt
   private String clientIp_;
 
   private ResetMetadataStmt(Action action, String db, TableName tableName,
-      PartitionSpec partitionSpec) {
+      List<PartitionSpec> partitionSpecList) {
     Preconditions.checkNotNull(action);
     action_ = action;
     database_ = db;
     tableName_ = tableName;
-    partitionSpec_ = partitionSpec;
-    if (partitionSpec_ != null) partitionSpec_.setTableName(tableName_);
+    partitionSpecList_ = partitionSpecList;
+    if (partitionSpecList_ != null) {
+      partitionSpecList_.forEach(p -> p.setTableName(tableName_));
+    }
   }
 
   public static ResetMetadataStmt createInvalidateStmt() {
@@ -111,10 +113,11 @@ public class ResetMetadataStmt extends StatementBase 
implements SingleTableStmt
         Preconditions.checkNotNull(tableName), /*partition*/ null);
   }
 
-  public static ResetMetadataStmt createRefreshPartitionStmt(TableName 
tableName,
-      PartitionSpec partitionSpec) {
+  public static ResetMetadataStmt createRefreshPartitionsStmt(TableName 
tableName,
+      List<PartitionSpec> partitionSpecList) {
     return new ResetMetadataStmt(Action.REFRESH_PARTITION, /*db*/ null,
-        Preconditions.checkNotNull(tableName), 
Preconditions.checkNotNull(partitionSpec));
+        Preconditions.checkNotNull(tableName),
+        Preconditions.checkNotNull(partitionSpecList));
   }
 
   public static ResetMetadataStmt createRefreshFunctionsStmt(String db) {
@@ -132,8 +135,6 @@ public class ResetMetadataStmt extends StatementBase 
implements SingleTableStmt
   @Override
   public TableName getTableName() { return tableName_; }
 
-  public PartitionSpec getPartitionSpec() { return partitionSpec_; }
-
   @VisibleForTesting
   protected Action getAction() { return action_; }
 
@@ -144,7 +145,7 @@ public class ResetMetadataStmt extends StatementBase 
implements SingleTableStmt
 
   @Override
   public void collectTableRefs(List<TableRef> tblRefs) {
-    if (tableName_ != null && partitionSpec_ != null) {
+    if (tableName_ != null && partitionSpecList_ != null) {
       tblRefs.add(new TableRef(tableName_.toPath(), null));
     }
   }
@@ -172,20 +173,22 @@ public class ResetMetadataStmt extends StatementBase 
implements SingleTableStmt
             throw new AnalysisException(Analyzer.TBL_DOES_NOT_EXIST_ERROR_MSG +
                 tableName_);
           }
-          if (partitionSpec_ != null) {
+          if (partitionSpecList_ != null) {
             try {
               // Get local table info without reaching out to HMS
               FeTable table = analyzer.getTable(dbName, tableName_.getTbl(),
                   /* must_exist */ true);
               if (AcidUtils.isTransactionalTable(table)) {
-                throw new AnalysisException("Refreshing a partition is not 
allowed on " +
+                throw new AnalysisException("Refreshing partitions is not 
allowed on " +
                     "transactional tables. Try to refresh the whole table 
instead.");
               }
             } catch (TableLoadingException e) {
               throw new AnalysisException(e);
             }
-            partitionSpec_.setPrivilegeRequirement(Privilege.ANY);
-            partitionSpec_.analyze(analyzer);
+            for (PartitionSpec ps : partitionSpecList_) {
+              ps.setPrivilegeRequirement(Privilege.ANY);
+              ps.analyze(analyzer);
+            }
           }
         } else {
           FeTable tbl = analyzer.getTableNoThrow(dbName, tableName_.getTbl());
@@ -244,8 +247,8 @@ public class ResetMetadataStmt extends StatementBase 
implements SingleTableStmt
         result.append("REFRESH ").append(tableName_.toSql());
         break;
       case REFRESH_PARTITION:
-        result.append("REFRESH ").append(tableName_.toSql()).append(" ")
-            .append(partitionSpec_.toSql(options));
+        result.append("REFRESH ").append(tableName_.toSql());
+        partitionSpecList_.forEach(ps -> result.append(" 
").append(ps.toSql(options)));
         break;
       case INVALIDATE_METADATA_ALL:
         result.append("INVALIDATE METADATA");
@@ -270,7 +273,11 @@ public class ResetMetadataStmt extends StatementBase 
implements SingleTableStmt
     if (tableName_ != null) {
       params.setTable_name(new TTableName(tableName_.getDb(), 
tableName_.getTbl()));
     }
-    if (partitionSpec_ != null) 
params.setPartition_spec(partitionSpec_.toThrift());
+    if (partitionSpecList_ != null) {
+      for (PartitionSpec ps : partitionSpecList_) {
+        params.addToPartition_spec_list(ps.toThrift());
+      }
+    }
     if (database_ != null) params.setDb_name(database_);
     if (action_ == Action.REFRESH_AUTHORIZATION) {
       params.setAuthorization(true);
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 0e40679dc..dfeadefd2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -2863,7 +2863,7 @@ public class CatalogServiceCatalog extends Catalog {
       CatalogObject.ThriftObjectType resultType, String reason, long eventId,
       boolean isSkipFileMetadataReload, EventSequence catalogTimeline)
       throws CatalogException {
-    LOG.info(String.format("Refreshing table metadata: %s", 
tbl.getFullName()));
+    LOG.info("Refreshing table metadata: {}", tbl.getFullName());
     Preconditions.checkState(!(tbl instanceof IncompleteTable));
     String dbName = tbl.getDb().getName();
     String tblName = tbl.getName();
@@ -2905,10 +2905,20 @@ public class CatalogServiceCatalog extends Catalog {
               dbName + "." + tblName, e);
         }
         if (tbl instanceof HdfsTable) {
+          Set<String> partitionsToUpdate = null;
+          if (request.getPartition_spec_listSize() > 0) {
+            // Convert to partition names and deduplicate partition specs.
+            Map<String, List<TPartitionKeyValue>> partName2PartSpec = new 
HashMap<>();
+            for (List<TPartitionKeyValue> partSpec : 
request.getPartition_spec_list()) {
+              
partName2PartSpec.put(HdfsTable.constructPartitionName(partSpec), partSpec);
+            }
+            partitionsToUpdate = partName2PartSpec.keySet();
+            request.setPartition_spec_list(new 
ArrayList<>(partName2PartSpec.values()));
+          }
           ((HdfsTable) tbl)
               .load(true, msClient.getHiveClient(), msTbl, 
!isSkipFileMetadataReload,
                   /* loadTableSchema*/true, 
request.refresh_updated_hms_partitions,
-                  /* partitionsToUpdate*/null, request.debug_action,
+                  partitionsToUpdate, request.debug_action,
                   /*partitionToEventId*/null, reason, catalogTimeline);
         } else {
           tbl.load(true, msClient.getHiveClient(), msTbl, reason, 
catalogTimeline);
@@ -3505,8 +3515,8 @@ public class CatalogServiceCatalog extends Catalog {
       String reason, long newCatalogVersion, @Nullable HdfsPartition 
hdfsPartition,
       EventSequence catalogTimeline) throws CatalogException {
     Preconditions.checkState(hdfsTable.isWriteLockedByCurrentThread());
-    LOG.info(String.format("Refreshing partition metadata: %s %s (%s)",
-        hdfsTable.getFullName(), partitionName, reason));
+    LOG.info("Refreshing partition metadata: {} {} ({})",
+        hdfsTable.getFullName(), partitionName, reason);
     try (MetaStoreClient msClient = getMetaStoreClient(catalogTimeline)) {
       org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null;
       try {
@@ -3522,9 +3532,9 @@ public class CatalogServiceCatalog extends Catalog {
           // non-existing partition was dropped from catalog, so we mark it as 
refreshed
           wasPartitionReloaded.setRef(true);
         } else {
-          LOG.info(String.format("Partition metadata for %s was not refreshed 
since "
+          LOG.info("Partition metadata for {} {} was not refreshed since "
                   + "it does not exist in metastore anymore",
-              hdfsTable.getFullName() + " " + partitionName));
+              hdfsTable.getFullName(), partitionName);
         }
         return hdfsTable.toTCatalogObject(resultType);
       } catch (Exception e) {
@@ -3540,8 +3550,8 @@ public class CatalogServiceCatalog extends Catalog {
     }
     hdfsTable.setCatalogVersion(newCatalogVersion);
     wasPartitionReloaded.setRef(true);
-    LOG.info(String.format("Refreshed partition metadata: %s %s",
-        hdfsTable.getFullName(), partitionName));
+    LOG.info("Refreshed partition metadata: {} {}", hdfsTable.getFullName(),
+        partitionName);
     return hdfsTable.toTCatalogObject(resultType);
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index d1618f162..06bbcb3ca 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1498,7 +1498,11 @@ public class HdfsTable extends Table implements 
FeFsTable {
      * hive metastore.
      * @return true if partition does not exist in metastore, else false.
      */
-    public abstract boolean isRemoved(HdfsPartition hdfsPartition);
+    public boolean isRemoved(HdfsPartition hdfsPartition) {
+      return isRemoved(hdfsPartition.getPartitionName());
+    }
+
+    public abstract boolean isRemoved(String partName);
 
     /**
      * Loads any partitions which are known to metastore but not provided in
@@ -1566,9 +1570,14 @@ public class HdfsTable extends Table implements 
FeFsTable {
       loadTimeForFileMdNs_ += loadNewPartitions(partitionNames, 
addedPartitions);
       // If a list of modified partitions (old and new) is specified, don't 
reload file
       // metadata for the new ones as they have already been detected in HMS 
and have been
-      // reloaded by loadNewPartitions().
+      // reloaded by loadNewPartitions(). Also ignore partitions that don't 
exist in HMS.
       if (partitionsToUpdate_ != null) {
         partitionsToUpdate_.removeAll(addedPartitions);
+        int orgSize = partitionsToUpdate_.size();
+        if (partitionsToUpdate_.removeIf(this::isRemoved)) {
+          LOG.info("Ignored {} non-existing partitions of table {}",
+              orgSize - partitionsToUpdate_.size(), getFullName());
+        }
       }
       // Load file metadata. Until we have a notification mechanism for when a
       // file changes in hdfs, it is sometimes required to reload all the file
@@ -1638,8 +1647,8 @@ public class HdfsTable extends Table implements FeFsTable 
{
     }
 
     @Override
-    public boolean isRemoved(HdfsPartition hdfsPartition) {
-      return !msPartitions_.containsKey(hdfsPartition.getPartitionName());
+    public boolean isRemoved(String partName) {
+      return !msPartitions_.containsKey(partName);
     }
 
     /**
@@ -1732,8 +1741,8 @@ public class HdfsTable extends Table implements FeFsTable 
{
     }
 
     @Override
-    public boolean isRemoved(HdfsPartition hdfsPartition) {
-      return 
!partitionNamesFromHms_.contains(hdfsPartition.getPartitionName());
+    public boolean isRemoved(String partName) {
+      return !partitionNamesFromHms_.contains(partName);
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index ed5c1e8cb..c9a986cb0 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -7158,12 +7158,14 @@ public class CatalogOpExecutor {
                   CatalogObject.ThriftObjectType.INVALIDATION :
                   CatalogObject.ThriftObjectType.FULL;
           if (isTableLoadedInCatalog) {
-            if (req.isSetPartition_spec()) {
+            if (req.isSetPartition_spec() || req.getPartition_spec_listSize() 
== 1) {
               
Preconditions.checkArgument(!AcidUtils.isTransactionalTable(tbl));
+              List<TPartitionKeyValue> partitionSpec = 
req.isSetPartition_spec() ?
+                  req.getPartition_spec() : 
req.getPartition_spec_list().get(0);
               Reference<Boolean> wasPartitionRefreshed = new 
Reference<>(false);
               // TODO if the partition was not really refreshed because the 
partSpec
               // was wrong, do we still need to send back the table?
-              updatedThriftTable = catalog_.reloadPartition(tbl, 
req.getPartition_spec(),
+              updatedThriftTable = catalog_.reloadPartition(tbl, partitionSpec,
                   wasPartitionRefreshed, resultType, cmdString, 
catalogTimeline);
             } else {
               // TODO IMPALA-8809: Optimisation for partitioned tables:
@@ -7264,15 +7266,26 @@ public class CatalogOpExecutor {
    */
   private void fireReloadEventAndUpdateRefreshEventId(
       TResetMetadataRequest req, TableName tblName, Table tbl) {
-    List<String> partVals = null;
+    // Partition spec (List<TPartitionKeyValue>) for each partition
+    List<List<TPartitionKeyValue>> partSpecList = null;
+    // Partition values (List<String>) for each partition
+    List<List<String>> partValsList = null;
     if (req.isSetPartition_spec()) {
-      partVals = req.getPartition_spec().stream().
-          map(TPartitionKeyValue::getValue).collect(Collectors.toList());
+      partSpecList = Collections.singletonList(req.partition_spec);
+    } else if (req.isSetPartition_spec_list()) {
+      partSpecList = req.partition_spec_list;
+    }
+    if (partSpecList != null) {
+      partValsList = partSpecList.stream()
+          .map(ps -> ps.stream()
+              .map(TPartitionKeyValue::getValue)
+              .collect(Collectors.toList()))
+          .collect(Collectors.toList());
     }
     try {
       List<Long> eventIds = MetastoreShim.fireReloadEventHelper(
-          catalog_.getMetaStoreClient(), req.isIs_refresh(), partVals, 
tblName.getDb(),
-          tblName.getTbl(), Collections.emptyMap());
+          catalog_.getMetaStoreClient(), req.isIs_refresh(), partValsList,
+          tblName.getDb(), tblName.getTbl(), Collections.emptyMap());
       LOG.info("Fired {} RELOAD events for table {}: {}", eventIds.size(),
           tbl.getFullName(), StringUtils.join(",", eventIds));
       // Update the lastRefreshEventId accordingly
@@ -7283,19 +7296,21 @@ public class CatalogOpExecutor {
             tbl.getFullName());
         return;
       }
-      if (req.isSetPartition_spec()) {
-        HdfsTable hdfsTbl = (HdfsTable) tbl;
-        HdfsPartition partition = hdfsTbl
-            .getPartitionFromThriftPartitionSpec(req.getPartition_spec());
-        if (partition != null) {
-          HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
-          partBuilder.setLastRefreshEventId(eventIds.get(0));
-          hdfsTbl.updatePartition(partBuilder);
-        } else {
-          LOG.warn("Partition {} no longer exists in table {}. It might be " +
-              "dropped by a concurrent operation.",
-              FeCatalogUtils.getPartitionName(hdfsTbl, partVals),
-              hdfsTbl.getFullName());
+      if (partSpecList != null) {
+        for (int i = 0; i < partSpecList.size(); ++i) {
+          HdfsTable hdfsTbl = (HdfsTable) tbl;
+          HdfsPartition partition = hdfsTbl
+              .getPartitionFromThriftPartitionSpec(partSpecList.get(i));
+          if (partition != null) {
+            HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
+            partBuilder.setLastRefreshEventId(eventIds.get(0));
+            hdfsTbl.updatePartition(partBuilder);
+          } else {
+            LOG.warn("Partition {} no longer exists in table {}. It might be " 
+
+                    "dropped by a concurrent operation.",
+                FeCatalogUtils.getPartitionName(hdfsTbl, partValsList.get(i)),
+                hdfsTbl.getFullName());
+          }
         }
       } else {
         tbl.setLastRefreshEventId(eventIds.get(0));
diff --git a/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java 
b/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java
index be7ee3e5e..c62005d45 100644
--- a/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java
@@ -139,7 +139,9 @@ public class CatalogOpUtil {
       cmd += "DATABASE " + req.getDb_name();
     } else if (req.isSetTable_name()) {
       cmd += "TABLE " + TableName.fromThrift(req.getTable_name());
-      if (req.isSetPartition_spec()) cmd += " PARTITIONS";
+      if (req.isSetPartition_spec() || req.isSetPartition_spec_list()) {
+        cmd += " PARTITIONS";
+      }
     } else if (req.isAuthorization()) {
       cmd += "AUTHORIZATION";
     } else {
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index 397885a12..19f889c45 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -430,6 +430,10 @@ public class AnalyzerTest extends FrontendTestBase {
     assertAction.accept(AnalyzesOk(
         "refresh functional.alltypessmall partition (year=2009, month=NULL)"),
         ResetMetadataStmt.Action.REFRESH_PARTITION);
+    assertAction.accept(AnalyzesOk(
+            "refresh functional.alltypessmall partition (year=2009, month=1)" +
+                " partition (year=2010, month=2) partition (year=2010, 
month=4)"),
+        ResetMetadataStmt.Action.REFRESH_PARTITION);
     assertAction.accept(AnalyzesOk(
         "refresh authorization", 
createAnalysisCtx(createAuthorizationFactory())),
         ResetMetadataStmt.Action.REFRESH_AUTHORIZATION);
@@ -456,6 +460,10 @@ public class AnalyzerTest extends FrontendTestBase {
         "refresh functional.alltypessmall partition (year=2009, month='foo')",
         "Value of partition spec (column=month) has incompatible type: 
'STRING'. "
             + "Expected type: 'INT'");
+    AnalysisError("refresh functional.alltypessmall partition (year=2009)" +
+            " partition (month=1)",
+        "Items in partition spec must exactly match the partition columns in "
+            + "the table definition: functional.alltypessmall (1 vs 2)");
     AnalysisError("refresh functional.zipcode_incomes partition (year=2009, 
month=1)",
         "Table is not partitioned: functional.zipcode_incomes");
     AnalysisError(
@@ -636,10 +644,14 @@ public class AnalyzerTest extends FrontendTestBase {
     AnalyzesOk("refresh functional.insert_only_transactional_table");
     AnalyzesOk("refresh functional_orc_def.full_transactional_table");
     AnalysisError("refresh functional.insert_only_transactional_table 
partition (j=1)",
-        "Refreshing a partition is not allowed on transactional tables. Try to 
refresh " +
+        "Refreshing partitions is not allowed on transactional tables. Try to 
refresh " +
         "the whole table instead.");
+    AnalysisError("refresh functional.insert_only_transactional_table 
partition (j=1) " +
+            "partition (j=2)",
+        "Refreshing partitions is not allowed on transactional tables. Try to 
refresh " +
+            "the whole table instead.");
     AnalysisError("refresh functional_orc_def.full_transactional_table 
partition (j=1)",
-        "Refreshing a partition is not allowed on transactional tables. Try to 
refresh " +
+        "Refreshing partitions is not allowed on transactional tables. Try to 
refresh " +
         "the whole table instead.");
   }
 
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index b2b50e697..caf288361 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3651,8 +3651,11 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("refresh Foo");
     ParsesOk("refresh Foo.S");
     ParsesOk("refresh Foo partition (col=2)");
+    ParsesOk("refresh Foo partition (col=2) partition (col=3)");
     ParsesOk("refresh Foo.S partition (col=2)");
     ParsesOk("refresh Foo.S partition (col1 = 2, col2 = 3)");
+    ParsesOk("refresh Foo.S partition (col1 = 2, col2 = 3) " +
+        "partition (col1 = 0, col2 = 0) partition (col1 = 1, col2 = 1)");
     ParsesOk("refresh functions Foo");
     ParsesOk("refresh authorization");
 
@@ -3664,6 +3667,7 @@ public class ParserTest extends FrontendTestBase {
     ParserError("refresh");
     ParserError("refresh Foo.S partition (col1 = 2, col2)");
     ParserError("refresh Foo.S partition ()");
+    ParserError("refresh Foo.S partition (col1 = 0), (col1 = 1)");
     ParserError("refresh functions Foo.S");
     ParserError("refresh authorization Foo");
   }
diff --git 
a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java 
b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java
index cc1e43804..f5a4ef93d 100644
--- a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java
@@ -232,9 +232,12 @@ public class StmtMetadataLoaderTest {
     testNoLoad("refresh functions functional");
     testNoLoad("refresh authorization");
 
-    // This stmt requires the table to be loaded.
+    // These stmts require the table to be loaded.
     testLoadTables("refresh functional.alltypes partition (year=2009, 
month=1)", 1, 1,
         new String[] {"default", "functional"}, new String[] 
{"functional.alltypes"});
+    testLoadTables("refresh functional.alltypes partition (year=2009, month=1) 
" +
+            "partition (year=2010, month=2)", 1, 1,
+        new String[] {"default", "functional"}, new String[] 
{"functional.alltypes"});
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index f32d4b806..138562db4 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -1891,6 +1891,8 @@ public class ToSqlTest extends FrontendTestBase {
   public void testRefresh() {
     testToSql("REFRESH functional.alltypes");
     testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1)");
+    testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1) " +
+        "PARTITION (year=2010, month=2)");
     testToSql("REFRESH FUNCTIONS functional");
     testToSql(createAnalysisCtx(createAuthorizationFactory()), "REFRESH 
AUTHORIZATION");
   }
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index a1a26e449..091a2bfaa 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -4000,7 +4000,7 @@ public class MetastoreEventsProcessorTest {
       eventsProcessor_.processEvents();
       // Fire a reload event and process partition with empty values
       MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(), true,
-          Arrays.asList("1"), TEST_DB_NAME, tblName, Collections.emptyMap());
+          partVals, TEST_DB_NAME, tblName, Collections.emptyMap());
       
BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES);
       processEventsAndVerifyStatus(prevFlag);
       // insert partition event
diff --git a/fe/src/test/java/org/apache/impala/service/FrontendTest.java 
b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
index ee53947eb..e20586ed4 100644
--- a/fe/src/test/java/org/apache/impala/service/FrontendTest.java
+++ b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
@@ -425,6 +425,8 @@ public class FrontendTest extends FrontendTestBase {
         db, Arrays.asList(db), Arrays.asList(db + ".foo"));
     TestCollectRequiredObjectsHelper("REFRESH mydb.foo PARTITION (p=1)",
         db, Arrays.asList("mydb"), Arrays.asList("mydb.foo"));
+    TestCollectRequiredObjectsHelper("REFRESH mydb.foo PARTITION (p=1) 
PARTITION (p=2)",
+        db, Arrays.asList("mydb"), Arrays.asList("mydb.foo"));
     TestCollectRequiredObjectsHelper("REFRESH foo PARTITION (p=1)",
         db, Arrays.asList(db), Arrays.asList(db + ".foo"));
 
diff --git a/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java 
b/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java
index 823fd6de1..3849201d1 100644
--- a/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java
@@ -96,6 +96,10 @@ public class CatalogOpUtilTest {
     req.setPartition_spec(Collections.emptyList());
     assertEquals("REFRESH TABLE default.tbl PARTITIONS issued by Alice",
         CatalogOpUtil.getShortDescForReset(req));
+    req.unsetPartition_spec();
+    req.setPartition_spec_list(Collections.emptyList());
+    assertEquals("REFRESH TABLE default.tbl PARTITIONS issued by Alice",
+        CatalogOpUtil.getShortDescForReset(req));
   }
 
   @Test
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 612bb2a28..6897424d1 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -592,18 +592,18 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
         .format(unique_database, test_reload_table))
     EventProcessorUtils.wait_for_event_processing(self)
 
-    def check_self_events(query):
+    def check_self_events(query, num_events=1):
       tbls_refreshed_before, partitions_refreshed_before, \
           events_skipped_before = self._get_self_event_metrics()
       last_event_id = 
EventProcessorUtils.get_current_notification_id(self.hive_client)
       self.client.execute(query)
       # Check if there is a reload event fired after refresh query.
       events = EventProcessorUtils.get_next_notification(self.hive_client, 
last_event_id)
-      assert len(events) == 1
-      last_event = events[0]
-      assert last_event.dbName == unique_database
-      assert last_event.tableName == test_reload_table
-      assert last_event.eventType == "RELOAD"
+      for event in events:
+        assert event.dbName == unique_database
+        assert event.tableName == test_reload_table
+        assert event.eventType == "RELOAD"
+      assert len(events) == num_events
       EventProcessorUtils.wait_for_event_processing(self)
       tbls_refreshed_after, partitions_refreshed_after, \
           events_skipped_after = self._get_self_event_metrics()
@@ -612,6 +612,11 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
     check_self_events("refresh {}.{} partition(year=2022)"
         .format(unique_database, test_reload_table))
     check_self_events("refresh {}.{}".format(unique_database, 
test_reload_table))
+    # Refresh multiple partitions. The last two are the same. Verify only two 
RELOAD
+    # events are generated.
+    check_self_events(
+        "refresh {}.{} partition(year=2022) partition(year=2023) 
partition(year=2023)"
+        .format(unique_database, test_reload_table), 2)
     EventProcessorUtils.wait_for_event_processing(self)
 
     if enable_sync_to_latest_event_on_ddls:
diff --git a/tests/metadata/test_refresh_partition.py 
b/tests/metadata/test_refresh_partition.py
index 18d1fbd69..8dc1d18dc 100644
--- a/tests/metadata/test_refresh_partition.py
+++ b/tests/metadata/test_refresh_partition.py
@@ -59,6 +59,10 @@ class TestRefreshPartition(ImpalaTestSuite):
     self.client.execute('refresh %s partition (y=71, z=8857)' % table_name)
     assert [('333', '5309')] == self.get_impala_partition_info(table_name, 
'y', 'z')
     assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
+    self.client.execute(
+        'refresh %s partition (y=71, z=8857) partition (y=0, z=0)' % 
table_name)
+    assert [('333', '5309')] == self.get_impala_partition_info(table_name, 
'y', 'z')
+    assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
 
   def test_remove_data_and_refresh(self, unique_database):
     """
@@ -91,6 +95,31 @@ class TestRefreshPartition(ImpalaTestSuite):
     result = self.client.execute("select count(*) from %s" % table_name)
     assert result.data == [str('0')]
 
+    # Test multiple partitions
+    self.client.execute(
+        'insert into table %s partition (y, z) values '
+        '(2, 33, 444), (3, 44, 555), (4, 55, 666)' % table_name)
+    result = self.client.execute('select * from %s' % table_name)
+    assert '2\t33\t444' in result.data
+    assert '3\t44\t555' in result.data
+    assert '4\t55\t666' in result.data
+    assert len(result.data) == 3
+    # Drop two partitions in Hive
+    self.run_stmt_in_hive(
+      'alter table %s drop partition (y>33)' % table_name)
+    # Query the table. With file handle caching, this may not produce an error,
+    # because the file handles are still open in the cache. If the system does
+    # produce an error, it should be the expected error.
+    try:
+      self.client.execute("select * from %s" % table_name)
+    except IMPALA_CONNECTION_EXCEPTION as e:
+      assert expected_error in str(e)
+    self.client.execute(
+        'refresh %s partition (y=33, z=444) partition (y=44, z=555) '
+        'partition (y=55, z=666)' % table_name)
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == ['1']
+
   def test_add_delete_data_to_hdfs_and_refresh(self, unique_database):
     """
     Data added/deleted directly in HDFS is visible in impala after refresh of
@@ -106,8 +135,9 @@ class TestRefreshPartition(ImpalaTestSuite):
       create table %s like functional.alltypes stored as parquet
       location '%s'
     """ % (table_name, table_location))
-    self.client.execute("alter table %s add partition (year=2010, month=1)" %
-        table_name)
+    for month in range(1, 5):
+      self.client.execute("alter table %s add partition (year=2010, month=%d)" 
%
+                          (table_name, month))
     self.client.execute("refresh %s" % table_name)
     # Check that there is no data in table
     result = self.client.execute("select count(*) from %s" % table_name)
@@ -127,6 +157,29 @@ class TestRefreshPartition(ImpalaTestSuite):
     result = self.client.execute("select count(*) from %s" % table_name)
     assert result.data == [str(0)]
 
+    # Test multiple partitions
+    for month in range(2, 5):
+      dst_path = "%s/year=2010/month=%d/%s" % (table_location, month, 
file_name)
+      self.filesystem_client.copy(src_file, dst_path, overwrite=True)
+    # Check that data added is not visible before refresh
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == ['0']
+    # Chech that data is visible after refresh
+    self.client.execute(
+      "refresh %s partition (year=2010, month=2) partition (year=2010, 
month=3) "
+      "partition (year=2010, month=4)" % table_name)
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str(file_num_rows * 3)]
+    # Check that after deleting the file and refreshing, it returns zero rows
+    for month in range(2, 5):
+      dst_path = "%s/year=2010/month=%d/%s" % (table_location, month, 
file_name)
+      check_call(["hadoop", "fs", "-rm", dst_path], shell=False)
+    self.client.execute(
+      "refresh %s partition (year=2010, month=2) partition (year=2010, 
month=3) "
+      "partition (year=2010, month=4)" % table_name)
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == ['0']
+
   def test_confirm_individual_refresh(self, unique_database):
     """
     Data added directly to HDFS is only visible for the partition refreshed
@@ -141,7 +194,7 @@ class TestRefreshPartition(ImpalaTestSuite):
       create table %s like functional.alltypes stored as parquet
       location '%s'
     """ % (table_name, table_location))
-    for month in [1, 2]:
+    for month in range(1, 6):
         self.client.execute("alter table %s add partition (year=2010, 
month=%s)" %
         (table_name, month))
     self.client.execute("refresh %s" % table_name)
@@ -149,7 +202,7 @@ class TestRefreshPartition(ImpalaTestSuite):
     result = self.client.execute("select count(*) from %s" % table_name)
     assert result.data == [str(0)]
     dst_path = table_location + "/year=2010/month=%s/" + file_name
-    for month in [1, 2]:
+    for month in range(1, 6):
         self.filesystem_client.copy(src_file, dst_path % month, overwrite=True)
     # Check that data added is not visible before refresh
     result = self.client.execute("select count(*) from %s" % table_name)
@@ -168,3 +221,9 @@ class TestRefreshPartition(ImpalaTestSuite):
     self.client.execute("refresh %s partition (year=2010, month=2)" % 
table_name)
     result = self.client.execute("select count(*) from %s" % table_name)
     assert result.data == [str(file_num_rows * 2)]
+    # Refresh multiple partitions
+    self.client.execute(
+        "refresh %s partition (year=2010, month=3) partition (year=2010, 
month=4) "
+        "partition (year=2010, month=5)" % table_name)
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str(file_num_rows * 5)]


Reply via email to