IMPALA-1683: Allow REFRESH on a single partition

Currently the only way to refresh metadata for a partition was to refresh
the whole table. This is a relatively time consuming process especially if
there are many partitions and only one is to be refreshed.
This patch allows the client to REFRESH on a single partition by using the
following syntax:
REFRESH [database_name.]table_name PARTITION (partition_spec)

Testing:
Added parsing and authorization tests in ParserTest.java and
AuthorizationTest.java respectively. A new test file
"test_refresh_partition.py" was added for testing functionality.

Performance:
For a table with 10000 partitions and 1 file per partition

                     execResetMetadata()       Total Execution Time

Refresh Table              3795 ms                   4630 ms

Refersh Partition            42 ms                    680 ms

We see that the time to refresh improves by a factor of 90x but due to
significant overhead of about 640ms in this case the effective improvement
is about 7x. As the size of the table and number of partitions increase,
this improvement would be more significant.

Change-Id: Ia9aa25d190ada367fbebaca47ae8b2cafbea16fb
Reviewed-on: http://gerrit.cloudera.org:8080/3813
Reviewed-by: Dimitris Tsirogiannis <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/36b4ea6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/36b4ea6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/36b4ea6f

Branch: refs/heads/master
Commit: 36b4ea6f651e904a62f4f4f8cab8c05a8f1b20dd
Parents: dd33e8f
Author: Bikramjeet Vig <[email protected]>
Authored: Tue Jun 14 16:25:17 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Fri Jul 29 23:57:50 2016 +0000

----------------------------------------------------------------------
 common/thrift/CatalogService.thrift             |   4 +
 fe/src/main/cup/sql-parser.cup                  |   8 +-
 .../impala/analysis/AnalysisContext.java        |   6 +-
 .../impala/analysis/ResetMetadataStmt.java      |  18 +-
 .../impala/catalog/CatalogServiceCatalog.java   |  49 ++++
 .../com/cloudera/impala/catalog/HdfsTable.java  |  32 +++
 .../impala/service/CatalogOpExecutor.java       |   7 +-
 .../cloudera/impala/analysis/AnalyzerTest.java  |  15 ++
 .../impala/analysis/AuthorizationTest.java      |  10 +
 .../cloudera/impala/analysis/ParserTest.java    |   7 +
 testdata/workloads/tpch/tpch_core.csv           |   1 -
 tests/metadata/test_refresh_partition.py        | 257 +++++++++++++++++++
 12 files changed, 406 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/common/thrift/CatalogService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogService.thrift 
b/common/thrift/CatalogService.thrift
index c2f2cec..87de568 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -186,6 +186,10 @@ struct TResetMetadataRequest {
   // Fully qualified name of the table to refresh or invalidate; not set if 
invalidating
   // the entire catalog
   3: optional CatalogObjects.TTableName table_name
+
+  // If set, refreshes the specified partition, otherwise
+  // refreshes the whole table
+  5: optional list<CatalogObjects.TPartitionKeyValue> partition_spec
 }
 
 // Response from TResetMetadataRequest

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index e0f86c9..00ee242 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -621,11 +621,13 @@ overwrite_val ::=
 
 reset_metadata_stmt ::=
   KW_INVALIDATE KW_METADATA
-  {: RESULT = new ResetMetadataStmt(null, false); :}
+  {: RESULT = new ResetMetadataStmt(null, false, null); :}
   | KW_INVALIDATE KW_METADATA table_name:table
-  {: RESULT = new ResetMetadataStmt(table, false); :}
+  {: RESULT = new ResetMetadataStmt(table, false, null); :}
   | KW_REFRESH table_name:table
-  {: RESULT = new ResetMetadataStmt(table, true); :}
+  {: RESULT = new ResetMetadataStmt(table, true, null); :}
+  | KW_REFRESH table_name:table partition_spec:partition
+  {: RESULT = new ResetMetadataStmt(table, true, partition); :}
   ;
 
 explain_stmt ::=

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java 
b/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java
index 098501c..043747d 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/AnalysisContext.java
@@ -389,7 +389,8 @@ public class AnalysisContext {
       throws AuthorizationException, InternalException {
     Preconditions.checkNotNull(analysisResult_);
     Analyzer analyzer = getAnalyzer();
-    // Process statements for which column-level privilege requests may be 
registered.
+    // Process statements for which column-level privilege requests may be 
registered
+    // except for DESCRIBE TABLE or REFRESH/INVALIDATE statements
     if (analysisResult_.isQueryStmt() || analysisResult_.isInsertStmt() ||
         analysisResult_.isUpdateStmt() || analysisResult_.isDeleteStmt() ||
         analysisResult_.isCreateTableAsSelectStmt() ||
@@ -434,7 +435,8 @@ public class AnalysisContext {
       for (PrivilegeRequest privReq: analyzer.getPrivilegeReqs()) {
         Preconditions.checkState(
             !(privReq.getAuthorizeable() instanceof AuthorizeableColumn) ||
-            analysisResult_.isDescribeTableStmt());
+            analysisResult_.isDescribeTableStmt() ||
+            analysisResult_.isResetMetadataStmt());
         authorizePrivilegeRequest(authzChecker, privReq);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/fe/src/main/java/com/cloudera/impala/analysis/ResetMetadataStmt.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/com/cloudera/impala/analysis/ResetMetadataStmt.java 
b/fe/src/main/java/com/cloudera/impala/analysis/ResetMetadataStmt.java
index 24656f6..b2834ee 100644
--- a/fe/src/main/java/com/cloudera/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/com/cloudera/impala/analysis/ResetMetadataStmt.java
@@ -21,6 +21,7 @@ import com.cloudera.impala.common.AnalysisException;
 import com.cloudera.impala.thrift.TResetMetadataRequest;
 import com.cloudera.impala.thrift.TTableName;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 
 /**
  * Representation of a REFRESH/INVALIDATE METADATA statement.
@@ -32,10 +33,17 @@ public class ResetMetadataStmt extends StatementBase {
   // true if it is a REFRESH statement.
   private final boolean isRefresh_;
 
-  public ResetMetadataStmt(TableName name, boolean isRefresh) {
+  // not null when refreshing a single partition
+  private final PartitionSpec partitionSpec_;
+
+  public ResetMetadataStmt(TableName name, boolean isRefresh,
+      PartitionSpec partitionSpec) {
     Preconditions.checkArgument(!isRefresh || name != null);
+    Preconditions.checkArgument(isRefresh || partitionSpec == null);
     this.tableName_ = name;
     this.isRefresh_ = isRefresh;
+    this.partitionSpec_ = partitionSpec;
+    if (partitionSpec_ != null) partitionSpec_.setTableName(tableName_);
   }
 
   public TableName getTableName() { return tableName_; }
@@ -57,6 +65,10 @@ public class ResetMetadataStmt extends StatementBase {
           // to Impala.
           throw new AnalysisException(Analyzer.TBL_DOES_NOT_EXIST_ERROR_MSG + 
tableName_);
         }
+        if (partitionSpec_ != null) {
+          partitionSpec_.setPrivilegeRequirement(Privilege.ANY);
+          partitionSpec_.analyze(analyzer);
+        }
       } else {
         // Verify the user has privileges to access this table.
         analyzer.registerPrivReq(new PrivilegeRequestBuilder()
@@ -77,6 +89,7 @@ public class ResetMetadataStmt extends StatementBase {
     }
 
     if (tableName_ != null) result.append(" ").append(tableName_);
+    if (partitionSpec_ != null) result.append(" " + partitionSpec_.toSql());
     return result.toString();
   }
 
@@ -86,6 +99,9 @@ public class ResetMetadataStmt extends StatementBase {
     if (tableName_ != null) {
       params.setTable_name(new TTableName(tableName_.getDb(), 
tableName_.getTbl()));
     }
+    if (partitionSpec_ != null) {
+      params.setPartition_spec(partitionSpec_.toThrift());
+    }
     return params;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
index dddc545..2e0e411 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/CatalogServiceCatalog.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
@@ -1167,4 +1168,52 @@ public class CatalogServiceCatalog extends Catalog {
   public TableId getNextTableId() { return new 
TableId(nextTableId_.getAndIncrement()); }
   public SentryProxy getSentryProxy() { return sentryProxy_; }
   public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
+
+  /**
+   * Reloads metadata for the partition defined by the partition spec
+   * 'partitionSpec' in table 'tbl'. Returns the table object with partition
+   * metadata reloaded
+   */
+  public Table reloadPartition(Table tbl, List<TPartitionKeyValue> 
partitionSpec)
+      throws CatalogException {
+    catalogLock_.writeLock().lock();
+    synchronized (tbl) {
+      long newCatalogVersion = incrementAndGetCatalogVersion();
+      catalogLock_.writeLock().unlock();
+      HdfsTable hdfsTable = (HdfsTable) tbl;
+      HdfsPartition hdfsPartition = hdfsTable
+          .getPartitionFromThriftPartitionSpec(partitionSpec);
+      // Retrieve partition name from existing partition or construct it from
+      // the partition spec
+      String partitionName = hdfsPartition == null
+          ? HdfsTable.constructPartitionName(partitionSpec)
+          : hdfsPartition.getPartitionName();
+      LOG.debug(String.format("Refreshing Partition metadata: %s %s",
+          hdfsTable.getFullName(), partitionName));
+      MetaStoreClient msClient = getMetaStoreClient();
+      try {
+        org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null;
+        try {
+          hmsPartition = msClient.getHiveClient().getPartition(
+              hdfsTable.getDb().getName(), hdfsTable.getName(), partitionName);
+        } catch (NoSuchObjectException e) {
+          // If partition does not exist in Hive Metastore, remove it from the
+          // catalog
+          if (hdfsPartition != null) {
+            hdfsTable.dropPartition(partitionSpec);
+            hdfsTable.setCatalogVersion(newCatalogVersion);
+          }
+          return hdfsTable;
+        } catch (Exception e) {
+          throw new CatalogException("Error loading metadata for partition: "
+              + hdfsTable.getFullName() + " " + partitionName, e);
+        }
+        hdfsTable.reloadPartition(hdfsPartition, hmsPartition);
+      } finally {
+        msClient.release();
+      }
+      hdfsTable.setCatalogVersion(newCatalogVersion);
+      return hdfsTable;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java 
b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
index 1457bb6..dafcafa 100644
--- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/com/cloudera/impala/catalog/HdfsTable.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.util.StringUtils;
@@ -85,6 +86,7 @@ import com.cloudera.impala.util.MetaStoreUtil;
 import com.cloudera.impala.util.TAccessLevelUtil;
 import com.cloudera.impala.util.TResultRowBuilder;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -1920,4 +1922,34 @@ public class HdfsTable extends Table {
     }
     return result;
   }
+
+  /**
+   * Constructs a partition name from a list of TPartitionKeyValue objects.
+   */
+  public static String constructPartitionName(List<TPartitionKeyValue> 
partitionSpec) {
+    List<String> partitionCols = Lists.newArrayList();
+    List<String> partitionVals = Lists.newArrayList();
+    for (TPartitionKeyValue kv: partitionSpec) {
+      partitionCols.add(kv.getName());
+      partitionVals.add(kv.getValue());
+    }
+    return org.apache.hadoop.hive.common.FileUtils.makePartName(partitionCols,
+        partitionVals);
+  }
+
+  /**
+   * Reloads the metadata of partition 'oldPartition' by removing
+   * it from the table and reconstructing it from the HMS partition object
+   * 'hmsPartition'. If old partition is null then nothing is removed and
+   * and partition constructed from 'hmsPartition' is simply added.
+   */
+  public void reloadPartition(HdfsPartition oldPartition, Partition 
hmsPartition)
+      throws CatalogException {
+    HdfsPartition refreshedPartition = createPartition(
+        hmsPartition.getSd(), hmsPartition);
+    Preconditions.checkArgument(oldPartition == null
+        || oldPartition.compareTo(refreshedPartition) == 0);
+    dropPartition(oldPartition);
+    addPartition(refreshedPartition);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
index a63427a..8cc433c 100644
--- a/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/com/cloudera/impala/service/CatalogOpExecutor.java
@@ -2749,7 +2749,12 @@ public class CatalogOpExecutor {
         if (tbl == null) {
           modifiedObjects.second = null;
         } else {
-          modifiedObjects.second = catalog_.reloadTable(tbl);
+          if (req.isSetPartition_spec()) {
+            modifiedObjects.second = catalog_.reloadPartition(tbl,
+                req.getPartition_spec());
+          } else {
+            modifiedObjects.second = catalog_.reloadTable(tbl);
+          }
         }
       } else {
         wasRemoved = catalog_.invalidateTable(req.getTable_name(), 
modifiedObjects);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/fe/src/test/java/com/cloudera/impala/analysis/AnalyzerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzerTest.java 
b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzerTest.java
index 6c03352..f7ee934 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzerTest.java
@@ -645,6 +645,8 @@ public class AnalyzerTest {
     AnalyzesOk("refresh functional.alltypessmall");
     AnalyzesOk("refresh functional.alltypes_view");
     AnalyzesOk("refresh functional.bad_serde");
+    AnalyzesOk("refresh functional.alltypessmall partition (year=2009, 
month=1)");
+    AnalyzesOk("refresh functional.alltypessmall partition (year=2009, 
month=NULL)");
 
     // invalidate metadata <table name> checks the Hive Metastore for table 
existence
     // and should not throw an AnalysisError if the table or db does not exist.
@@ -655,6 +657,19 @@ public class AnalyzerTest {
         "Table does not exist: functional.unknown_table");
     AnalysisError("refresh unknown_db.unknown_table",
         "Database does not exist: unknown_db");
+    AnalysisError("refresh functional.alltypessmall partition (year=2009, 
int_col=10)",
+        "Column 'int_col' is not a partition column in table: 
functional.alltypessmall");
+    AnalysisError("refresh functional.alltypessmall partition (year=2009)",
+        "Items in partition spec must exactly match the partition columns in "
+            + "the table definition: functional.alltypessmall (1 vs 2)");
+    AnalysisError("refresh functional.alltypessmall partition (year=2009, 
year=2009)",
+        "Duplicate partition key name: year");
+    AnalysisError(
+        "refresh functional.alltypessmall partition (year=2009, month='foo')",
+        "Value of partition spec (column=month) has incompatible type: 
'STRING'. "
+            + "Expected type: 'INT'");
+    AnalysisError("refresh functional.zipcode_incomes partition (year=2009, 
month=1)",
+        "Table is not partitioned: functional.zipcode_incomes");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/fe/src/test/java/com/cloudera/impala/analysis/AuthorizationTest.java
----------------------------------------------------------------------
diff --git 
a/fe/src/test/java/com/cloudera/impala/analysis/AuthorizationTest.java 
b/fe/src/test/java/com/cloudera/impala/analysis/AuthorizationTest.java
index 9f371c7..7bb675b 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/AuthorizationTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/AuthorizationTest.java
@@ -791,6 +791,10 @@ public class AuthorizationTest {
     AuthzOk("refresh functional.alltypesagg");
     AuthzOk("invalidate metadata functional.view_view");
     AuthzOk("refresh functional.view_view");
+    // Positive cases for checking refresh partition
+    AuthzOk("refresh functional.alltypesagg partition (year=2010, month=1, 
day=1)");
+    AuthzOk("refresh functional.alltypes partition (year=2009, month=1)");
+    AuthzOk("refresh functional_seq_snap.alltypes partition (year=2009, 
month=1)");
 
     AuthzError("invalidate metadata unknown_db.alltypessmall",
         "User '%s' does not have privileges to access: 
unknown_db.alltypessmall");
@@ -815,6 +819,12 @@ public class AuthorizationTest {
 
     AuthzError("invalidate metadata",
         "User '%s' does not have privileges to access: server");
+    AuthzError(
+        "refresh functional_rc.alltypesagg partition (year=2010, month=1, 
day=1)",
+        "User '%s' does not have privileges to access: 
functional_rc.alltypesagg");
+    AuthzError(
+        "refresh functional_rc.alltypesagg partition (year=2010, month=1, 
day=9999)",
+        "User '%s' does not have privileges to access: 
functional_rc.alltypesagg");
 
     // TODO: Add test support for dynamically changing privileges for
     // file-based policy.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java 
b/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java
index aa58056..f506c3a 100644
--- a/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/com/cloudera/impala/analysis/ParserTest.java
@@ -2751,11 +2751,18 @@ public class ParserTest {
     ParsesOk("invalidate metadata Foo.S");
     ParsesOk("refresh Foo");
     ParsesOk("refresh Foo.S");
+    ParsesOk("refresh Foo partition (col=2)");
+    ParsesOk("refresh Foo.S partition (col=2)");
+    ParsesOk("refresh Foo.S partition (col1 = 2, col2 = 3)");
 
     ParserError("invalidate");
     ParserError("invalidate metadata Foo.S.S");
+    ParserError("invalidate metadata partition (col=2)");
+    ParserError("invalidate metadata Foo.S partition (col=2)");
     ParserError("REFRESH Foo.S.S");
     ParserError("refresh");
+    ParserError("refresh Foo.S partition (col1 = 2, col2)");
+    ParserError("refresh Foo.S partition ()");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/testdata/workloads/tpch/tpch_core.csv
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/tpch_core.csv 
b/testdata/workloads/tpch/tpch_core.csv
index 86804ac..04d7896 100644
--- a/testdata/workloads/tpch/tpch_core.csv
+++ b/testdata/workloads/tpch/tpch_core.csv
@@ -7,4 +7,3 @@ file_format:rc, dataset:tpch, compression_codec:none, 
compression_type:none
 file_format:avro, dataset:tpch, compression_codec: none, compression_type: none
 file_format:avro, dataset:tpch, compression_codec: snap, compression_type: 
block
 file_format:parquet, dataset:tpch, compression_codec: none, compression_type: 
none
-file_format:kudu, dataset:tpch, compression_codec: none, compression_type: none

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/36b4ea6f/tests/metadata/test_refresh_partition.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_refresh_partition.py 
b/tests/metadata/test_refresh_partition.py
new file mode 100644
index 0000000..92f9a19
--- /dev/null
+++ b/tests/metadata/test_refresh_partition.py
@@ -0,0 +1,257 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from subprocess import check_call
+
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import create_single_exec_option_dimension
+from tests.common.test_dimensions import create_uncompressed_text_dimension
+from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal
+from tests.util.filesystem_utils import get_fs_path
+
+
[email protected]
[email protected]
[email protected]
+class TestRefreshPartition(ImpalaTestSuite):
+  """
+  This class tests the functionality to refresh a partition individually
+  for a table in HDFS
+  """
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestRefreshPartition, cls).add_test_dimensions()
+
+    # There is no reason to run these tests using all dimensions.
+    cls.TestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.TestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  def impala_partition_names(self, table_name):
+    """
+    Find the names of the partitions of a table, as Impala sees them.
+    The return format is a list of lists of strings. Each string represents
+    a partition value of a given column.
+    """
+    rows = self.client.execute('show partitions %s' %
+                               table_name).get_data().split('\n')
+    """
+    According to the output of 'show partitions' query, the first (n-8)
+    columns are the columns on which the table is partitioned
+    """
+    return [row.split('\t')[0:-8] for row in rows[:-1]]
+
+  def hive_partition_names(self, table_name):
+    """
+    Find the names of the partitions of a table, as Hive sees them.
+    The return format is a list of strings. Each string represents a partition
+    value of a given column in a format like 'column1=7/column2=8'.
+    """
+    return self.run_stmt_in_hive(
+        'show partitions %s' % table_name).split('\n')[1:-1]
+
+  def test_add_hive_partition_and_refresh(self, vector, unique_database):
+    """
+    Partition added in Hive can be viewed in Impala after refreshing
+    partition.
+    """
+    table_name = unique_database + '.' + "partition_test_table"
+    self.client.execute(
+        'create table %s (x int) partitioned by (y int, z int)' %
+        table_name)
+    assert [] == self.impala_partition_names(table_name)
+    self.run_stmt_in_hive(
+        'alter table %s add partition (y=333, z=5309)' % table_name)
+    # Make sure Impala can't see the partition yet
+    assert [] == self.impala_partition_names(table_name)
+    self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
+    # Impala can see the partition
+    assert [['333', '5309']] == self.impala_partition_names(table_name)
+    # Impala's refresh didn't alter Hive's knowledge of the partition
+    assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
+
+  def test_drop_hive_partition_and_refresh(self, vector, unique_database):
+    """
+    Partition dropped in Hive is removed in Impala as well after refreshing
+    partition.
+    """
+    table_name = unique_database + '.' + "partition_test_table"
+    self.client.execute(
+        'create table %s (x int) partitioned by (y int, z int)' %
+        table_name)
+    self.client.execute(
+        'alter table %s add partition (y=333, z=5309)' % table_name)
+    assert [['333', '5309']] == self.impala_partition_names(table_name)
+    self.run_stmt_in_hive(
+        'alter table %s drop partition (y=333, z=5309)' % table_name)
+    # Make sure Impala can still see the partition
+    assert [['333', '5309']] == self.impala_partition_names(table_name)
+    self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
+    # Impala can see the partition is not there anymore
+    assert [] == self.impala_partition_names(table_name)
+    # Impala's refresh didn't alter Hive's knowledge of the partition
+    assert [] == self.hive_partition_names(table_name)
+
+  def test_add_data_and_refresh(self, vector, unique_database):
+    """
+    Data added through hive is visible in impala after refresh of partition.
+    """
+    table_name = unique_database + '.' + "partition_test_table"
+    self.client.execute(
+        'create table %s (x int) partitioned by (y int, z int)' %
+        table_name)
+    self.client.execute(
+        'alter table %s add partition (y=333, z=5309)' % table_name)
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str('0')]
+    self.run_stmt_in_hive(
+        'insert into table %s partition (y=333, z=5309) values (2)'
+        % table_name)
+    # Make sure its still shows the same result before refreshing
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str('0')]
+
+    self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
+    assert '2\t333\t5309' == self.client.execute(
+        'select * from %s' % table_name).get_data()
+
+  def test_refresh_invalid_partition(self, vector, unique_database):
+    """
+    Trying to refresh a partition that does not exist does not modify anything
+    either in impala or hive.
+    """
+    table_name = unique_database + '.' + "partition_test_table"
+    self.client.execute(
+        'create table %s (x int) partitioned by (y int, z int)' %
+        table_name)
+    self.client.execute(
+        'alter table %s add partition (y=333, z=5309)' % table_name)
+    assert [['333', '5309']] == self.impala_partition_names(table_name)
+    assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
+    self.client.execute('refresh %s partition (y=71, z=8857)' % table_name)
+    assert [['333', '5309']] == self.impala_partition_names(table_name)
+    assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
+
+  def test_remove_data_and_refresh(self, vector, unique_database):
+    """
+    Data removed through hive is visible in impala after refresh of partition.
+    """
+    expected_error = 'Error(2): No such file or directory'
+    table_name = unique_database + '.' + "partition_test_table"
+    self.client.execute(
+        'create table %s (x int) partitioned by (y int, z int)' %
+        table_name)
+    self.client.execute(
+        'alter table %s add partition (y=333, z=5309)' % table_name)
+    self.client.execute(
+        'insert into table %s partition (y=333, z=5309) values (2)' % 
table_name)
+    assert '2\t333\t5309' == self.client.execute(
+        'select * from %s' % table_name).get_data()
+
+    self.run_stmt_in_hive(
+        'alter table %s drop partition (y=333, z=5309)' % table_name)
+        # Query the table and check for expected error.
+    try:
+      self.client.execute("select * from %s" % table_name)
+      assert False, "Query was expected to fail"
+    except ImpalaBeeswaxException as e:
+      assert expected_error in str(e)
+
+    self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str('0')]
+
+  def test_add_delete_data_to_hdfs_and_refresh(self, vector, unique_database):
+    """
+    Data added/deleted directly in HDFS is visible in impala after refresh of
+    partition.
+    """
+    table_name = unique_database + '.' + "partition_test_table"
+    table_location = get_fs_path("/test-warehouse/%s" % unique_database)
+    file_name = "alltypes.parq"
+    src_file = 
get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
+      "day=9/*.parq")
+    file_num_rows = 1000
+    self.client.execute("""
+      create table %s like functional.alltypes stored as parquet
+      location '%s'
+    """ % (table_name, table_location))
+    self.client.execute("alter table %s add partition (year=2010, month=1)" %
+        table_name)
+    self.client.execute("refresh %s" % table_name)
+    # Check that there is no data in table
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str(0)]
+    dst_path = "%s/year=2010/month=1/%s" % (table_location, file_name)
+    check_call(["hadoop", "fs", "-cp", "-f", src_file, dst_path], shell=False)
+    # Check that data added is not visible before refresh
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str(0)]
+    # Chech that data is visible after refresh
+    self.client.execute("refresh %s partition (year=2010, month=1)" % 
table_name)
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str(file_num_rows)]
+    # Check that after deleting the file and refreshing, it returns zero rows
+    check_call(["hadoop", "fs", "-rm", dst_path], shell=False)
+    self.client.execute("refresh %s partition (year=2010, month=1)" % 
table_name)
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str(0)]
+
+  def test_confirm_individual_refresh(self, vector, unique_database):
+    """
+    Data added directly to HDFS is only visible for the partition refreshed
+    """
+    table_name = unique_database + '.' + "partition_test_table"
+    table_location = get_fs_path("/test-warehouse/%s" % unique_database)
+    file_name = "alltypes.parq"
+    src_file = 
get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
+      "day=9/*.parq")
+    file_num_rows = 1000
+    self.client.execute("""
+      create table %s like functional.alltypes stored as parquet
+      location '%s'
+    """ % (table_name, table_location))
+    for month in [1, 2]:
+        self.client.execute("alter table %s add partition (year=2010, 
month=%s)" %
+        (table_name, month))
+    self.client.execute("refresh %s" % table_name)
+    # Check that there is no data in table
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str(0)]
+    dst_path = table_location + "/year=2010/month=%s/" + file_name
+    for month in [1, 2]:
+        check_call(["hadoop", "fs", "-cp", "-f", src_file, dst_path % month],
+                   shell=False)
+    # Check that data added is not visible before refresh
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str(0)]
+    # Check that data is visible after refresh on the first partition only
+    self.client.execute("refresh %s partition (year=2010, month=1)" %
+        table_name)
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str(file_num_rows)]
+    # Check that the data is not yet visible for the second partition
+    # that was not refreshed
+    result = self.client.execute(
+        "select count(*) from %s where year=2010 and month=2" % table_name)
+    assert result.data == [str(0)]
+    # Check that data is visible for the second partition after refresh
+    self.client.execute("refresh %s partition (year=2010, month=2)" % 
table_name)
+    result = self.client.execute("select count(*) from %s" % table_name)
+    assert result.data == [str(file_num_rows*2)]

Reply via email to