This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0223cbf  HIVE-21063 : Support statistics in cachedStore for 
transactional table. (Mahesh Kumar Behera, reviewed by Sankar Hariappan, Daniel 
Dai)
0223cbf is described below

commit 0223cbf15ae2b27d4879627760a4dab4d7cc713f
Author: Mahesh Kumar Behera <[email protected]>
AuthorDate: Fri Feb 8 11:51:00 2019 +0530

    HIVE-21063 : Support statistics in cachedStore for transactional table. 
(Mahesh Kumar Behera, reviewed by Sankar Hariappan, Daniel Dai)
---
 .../hcatalog/listener/DbNotificationListener.java  |   4 +-
 .../TestCachedStoreUpdateUsingEvents.java          | 714 ++++++++++++++++++---
 .../hadoop/hive/ql/stats/StatsUpdaterThread.java   |   4 +-
 .../hadoop/hive/metastore/HiveMetaStore.java       |  25 +-
 .../apache/hadoop/hive/metastore/ObjectStore.java  |  10 +-
 .../hadoop/hive/metastore/cache/CachedStore.java   | 157 +++--
 .../hadoop/hive/metastore/cache/SharedCache.java   | 245 ++++++-
 .../events/UpdatePartitionColumnStatEvent.java     |  12 +-
 .../events/UpdateTableColumnStatEvent.java         |  13 +-
 .../hive/metastore/messaging/MessageBuilder.java   |   8 +-
 .../UpdatePartitionColumnStatMessage.java          |   2 -
 .../messaging/UpdateTableColumnStatMessage.java    |   2 -
 .../json/JSONUpdatePartitionColumnStatMessage.java |  10 +-
 .../json/JSONUpdateTableColumnStatMessage.java     |  10 +-
 .../src/main/sql/derby/hive-schema-4.0.0.derby.sql |   2 +
 .../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql     |   3 +
 .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql |   2 +
 .../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql     |   3 +
 .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql |   2 +
 .../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql     |   3 +
 .../main/sql/oracle/hive-schema-4.0.0.oracle.sql   |   2 +
 .../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql   |   3 +
 .../sql/postgres/hive-schema-4.0.0.postgres.sql    |   2 +
 .../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql   |   3 +
 .../hadoop/hive/metastore/TestHiveMetaStore.java   |  10 +-
 25 files changed, 1004 insertions(+), 247 deletions(-)

diff --git 
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
 
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 8404e3e..963b227 100644
--- 
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ 
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -759,7 +759,7 @@ public class DbNotificationListener extends 
TransactionalMetaStoreEventListener
             
.buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(),
                     updateTableColumnStatEvent.getTableObj(),
                     updateTableColumnStatEvent.getTableParameters(),
-                    updateTableColumnStatEvent.getValidWriteIds(), 
updateTableColumnStatEvent.getWriteId());
+                    updateTableColumnStatEvent.getWriteId());
     NotificationEvent event = new NotificationEvent(0, now(), 
EventType.UPDATE_TABLE_COLUMN_STAT.toString(),
                     msgEncoder.getSerializer().serialize(msg));
     ColumnStatisticsDesc statDesc = 
updateTableColumnStatEvent.getColStats().getStatsDesc();
@@ -789,7 +789,7 @@ public class DbNotificationListener extends 
TransactionalMetaStoreEventListener
                     updatePartColStatEvent.getPartVals(),
                     updatePartColStatEvent.getPartParameters(),
                     updatePartColStatEvent.getTableObj(),
-                    updatePartColStatEvent.getValidWriteIds(), 
updatePartColStatEvent.getWriteId());
+                    updatePartColStatEvent.getWriteId());
     NotificationEvent event = new NotificationEvent(0, now(), 
EventType.UPDATE_PARTITION_COLUMN_STAT.toString(),
                     msgEncoder.getSerializer().serialize(msg));
     ColumnStatisticsDesc statDesc = 
updatePartColStatEvent.getPartColStats().getStatsDesc();
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java
index 83f12a5..cdfc60c 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java
@@ -4,6 +4,8 @@ import java.util.*;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.*;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
@@ -12,7 +14,10 @@ import 
org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
 import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.hcatalog.listener.DbNotificationListener;
 import org.junit.Assert;
@@ -29,6 +34,7 @@ public class TestCachedStoreUpdateUsingEvents {
   private SharedCache sharedCache;
   private Configuration conf;
   private HiveMetaStore.HMSHandler hmsHandler;
+  private String[] colType = new String[] {"double", "string"};
 
   @Before
   public void setUp() throws Exception {
@@ -39,11 +45,14 @@ public class TestCachedStoreUpdateUsingEvents {
     MetastoreConf.setVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS, 
DbNotificationListener.class.getName());
     MetastoreConf.setVar(conf, ConfVars.RAW_STORE_IMPL, 
"org.apache.hadoop.hive.metastore.cache.CachedStore");
     MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, 
true);
+    MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true);
+    MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED, 
false);
     MetaStoreTestUtils.setConfForStandloneMode(conf);
 
     hmsHandler = new HiveMetaStore.HMSHandler("testCachedStore", conf, true);
 
-    rawStore = hmsHandler.getMS();
+    rawStore = new ObjectStore();
+    rawStore.setConf(hmsHandler.getConf());
     sharedCache = CachedStore.getSharedCache();
 
     // Stop the CachedStore cache update service. We'll start it explicitly to 
control the test
@@ -69,10 +78,14 @@ public class TestCachedStoreUpdateUsingEvents {
     String serdeLocation = "file:/tmp";
     Map<String, String> serdeParams = new HashMap<>();
     SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>());
-    StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation, "input", 
"output", false, 0,
+    StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation,
+            null, null, false, 3,
             serdeInfo, null, null, serdeParams);
+    sd.setInputFormat(OrcInputFormat.class.getName());
+    sd.setOutputFormat(OrcOutputFormat.class.getName());
     sd.setStoredAsSubDirectories(false);
-    Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, 
tblParams, null, null,
+    Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, 
tblParams,
+            null, null,
             TableType.MANAGED_TABLE.toString());
     tbl.setCatName(DEFAULT_CATALOG_NAME);
     return tbl;
@@ -186,6 +199,10 @@ public class TestCachedStoreUpdateUsingEvents {
     long lastEventId = -1;
     RawStore rawStore = hmsHandler.getMS();
 
+    // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
+    CachedStore.prewarm(rawStore);
+
     // Add a db via rawStore
     String dbName = "test_table_ops";
     String dbOwner = "user1";
@@ -193,10 +210,6 @@ public class TestCachedStoreUpdateUsingEvents {
     hmsHandler.create_database(db);
     db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
 
-    // Prewarm CachedStore
-    CachedStore.setCachePrewarmedState(false);
-    CachedStore.prewarm(rawStore);
-
     // Add a table via rawStore
     String tblName = "tbl";
     String tblOwner = "user1";
@@ -263,6 +276,10 @@ public class TestCachedStoreUpdateUsingEvents {
     long lastEventId = -1;
     RawStore rawStore = hmsHandler.getMS();
 
+    // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
+    CachedStore.prewarm(rawStore);
+
     // Add a db via rawStore
     String dbName = "test_partition_ops";
     String dbOwner = "user1";
@@ -285,21 +302,19 @@ public class TestCachedStoreUpdateUsingEvents {
     hmsHandler.create_table(tbl);
     tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
 
-    // Prewarm CachedStore
-    CachedStore.setCachePrewarmedState(false);
-    CachedStore.prewarm(rawStore);
-
     final String ptnColVal1 = "aaa";
     Map<String, String> partParams = new HashMap<String, String>();
     Partition ptn1 =
-            new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, 
tbl.getSd(), partParams);
+            new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0,
+                    0, tbl.getSd(), partParams);
     ptn1.setCatName(DEFAULT_CATALOG_NAME);
     hmsHandler.add_partition(ptn1);
     ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, 
Arrays.asList(ptnColVal1));
 
     final String ptnColVal2 = "bbb";
     Partition ptn2 =
-            new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, 
tbl.getSd(), partParams);
+            new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0,
+                    0, tbl.getSd(), partParams);
     ptn2.setCatName(DEFAULT_CATALOG_NAME);
     hmsHandler.add_partition(ptn2);
     ptn2 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, 
Arrays.asList(ptnColVal2));
@@ -307,17 +322,21 @@ public class TestCachedStoreUpdateUsingEvents {
     // Read database, table, partition via CachedStore
     Database dbRead = 
sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), 
dbName.toLowerCase());
     Assert.assertEquals(db, dbRead);
-    Table tblRead = 
sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), 
dbName.toLowerCase(), tblName.toLowerCase());
+    Table tblRead = 
sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
+            dbName.toLowerCase(), tblName.toLowerCase());
     compareTables(tbl, tblRead);
-    Partition ptn1Read = 
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), 
dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal1));
+    Partition ptn1Read = 
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
+            dbName.toLowerCase(), tblName.toLowerCase(), 
Arrays.asList(ptnColVal1));
     comparePartitions(ptn1, ptn1Read);
-    Partition ptn2Read = 
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(), 
dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal2));
+    Partition ptn2Read = 
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
+            dbName.toLowerCase(), tblName.toLowerCase(), 
Arrays.asList(ptnColVal2));
     comparePartitions(ptn2, ptn2Read);
 
     // Add a new partition via rawStore
     final String ptnColVal3 = "ccc";
     Partition ptn3 =
-            new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, 
tbl.getSd(), partParams);
+            new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0,
+                    0, tbl.getSd(), partParams);
     ptn3.setCatName(DEFAULT_CATALOG_NAME);
     hmsHandler.add_partition(ptn3);
     ptn3 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, 
Arrays.asList(ptnColVal3));
@@ -326,7 +345,8 @@ public class TestCachedStoreUpdateUsingEvents {
     ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, 
Arrays.asList(ptnColVal1));
     final String ptnColVal1Alt = "aaa";
     Partition ptn1Atl =
-            new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, 
tbl.getSd(), partParams);
+            new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0,
+                    0, tbl.getSd(), partParams);
     ptn1Atl.setCatName(DEFAULT_CATALOG_NAME);
     hmsHandler.alter_partitions(dbName, tblName, Arrays.asList(ptn1Atl));
     ptn1Atl = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, 
Arrays.asList(ptnColVal1Alt));
@@ -336,7 +356,8 @@ public class TestCachedStoreUpdateUsingEvents {
     hmsHandler.drop_partition(dbName, tblName, Arrays.asList(ptnColVal2), 
false);
 
     // Read the newly added partition via CachedStore
-    Partition ptnRead = 
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, 
Arrays.asList(ptnColVal3));
+    Partition ptnRead = 
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName,
+            tblName, Arrays.asList(ptnColVal3));
     comparePartitions(ptn3, ptnRead);
 
     // Read the altered partition via CachedStore
@@ -362,49 +383,116 @@ public class TestCachedStoreUpdateUsingEvents {
     sharedCache.getSdCache().clear();
   }
 
-  @Test
-  public void testTableColumnStatistics() throws Throwable {
-    String dbName = "column_stats_test_db";
-    String tblName = "tbl";
-    String typeName = "person";
-    String tblOwner = "testowner";
-    int lastAccessed = 6796;
-    String dbOwner = "user1";
+  private void updateTableColStats(String dbName, String tblName, String[] 
colName,
+                                   double highValue, double avgColLen, boolean 
isTxnTable) throws Throwable {
+    long writeId = -1;
+    String validWriteIds = null;
+    if (isTxnTable) {
+      writeId = allocateWriteIds(allocateTxns(1), dbName, 
tblName).get(0).getWriteId();
+      validWriteIds = getValidWriteIds(dbName, tblName);
+    }
 
-    // Add a db via rawStore
-    Database db = createTestDb(dbName, dbOwner);
-    hmsHandler.create_database(db);
-    db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(tblName);
+    statsDesc.setIsTblLevel(true);
+    statsDesc.setPartName(null);
 
-    Map<String, String> tableParams =  new HashMap<>();
-    tableParams.put("test_param_1", "hi");
-    tableParams.put("test_param_2", "50");
+    ColumnStatistics colStats = new ColumnStatistics();
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, 
avgColLen));
 
-    // Add a table via rawStore
-    List<FieldSchema> cols = new ArrayList<FieldSchema>();
-    cols.add(new FieldSchema("income", "int", "integer column"));
-    cols.add(new FieldSchema("name", "string", "string column"));
+    SetPartitionsStatsRequest setTblColStat = new 
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
 
-    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
-    ptnCols.add(new FieldSchema("ds", "string", "string partition column"));
-    ptnCols.add(new FieldSchema("hr", "int", "integer partition column"));
+    // write stats objs persistently
+    hmsHandler.update_table_column_statistics_req(setTblColStat);
+    validateTablePara(dbName, tblName);
+
+    ColumnStatistics colStatsCache = 
sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, Lists.newArrayList(colName[0]), validWriteIds, 
true);
+    Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(), 
colName[0]);
+    verifyStatDouble(colStatsCache.getStatsObj().get(0), colName[0], 
highValue);
+
+    colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, Lists.newArrayList(colName[1]), validWriteIds, 
true);
+    Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(), 
colName[1]);
+    verifyStatString(colStatsCache.getStatsObj().get(0), colName[1], 
avgColLen);
+  }
 
-    Table tbl = createTestTblParam(dbName, tblName, tblOwner, cols, null, 
tableParams);
-    hmsHandler.create_table(tbl);
+  private void updatePartColStats(String dbName, String tblName, boolean 
isTxnTable, String[] colName,
+                                  String partName, double highValue, double 
avgColLen) throws Throwable {
+    long writeId = -1;
+    String validWriteIds = null;
+    List<Long> txnIds = null;
 
-    // Prewarm CachedStore
-    CachedStore.setCachePrewarmedState(false);
-    CachedStore.prewarm(rawStore);
+    if (isTxnTable) {
+      txnIds = allocateTxns(1);
+      writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
+      validWriteIds = getValidWriteIds(dbName, tblName);
+    }
 
-    // Create a ColumnStatistics Obj
-    String[] colName = new String[]{"income", "name"};
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(tblName);
+    statsDesc.setIsTblLevel(false);
+    statsDesc.setPartName(partName);
+
+    ColumnStatistics colStats = new ColumnStatistics();
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, 
avgColLen));
+
+    SetPartitionsStatsRequest setTblColStat = new 
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
+
+    // write stats objs persistently
+    hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+    if (isTxnTable) {
+      CommitTxnRequest rqst = new CommitTxnRequest(txnIds.get(0));
+      hmsHandler.commit_txn(rqst);
+      writeId = allocateWriteIds(allocateTxns(1), dbName, 
tblName).get(0).getWriteId();
+      validWriteIds = getValidWriteIds(dbName, tblName);
+    }
+
+    Deadline.startTimer("getPartitionColumnStatistics");
+    List<ColumnStatistics> statRowStore = 
rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName,
+            Collections.singletonList(partName), 
Collections.singletonList(colName[1]), validWriteIds);
+    Deadline.stopTimer();
+    verifyStatString(statRowStore.get(0).getStatsObj().get(0), colName[1], 
avgColLen);
+    if (isTxnTable) {
+      Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), true);
+    } else {
+      Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), false);
+    }
+
+    List<ColumnStatistics> statSharedCache = 
sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, Collections.singletonList(partName), 
Collections.singletonList(colName[1]),
+            validWriteIds, true);
+    verifyStatString(statSharedCache.get(0).getStatsObj().get(0), colName[1], 
avgColLen);
+    if (isTxnTable) {
+      Assert.assertEquals(statSharedCache.get(0).isIsStatsCompliant(), true);
+    } else {
+      Assert.assertEquals(statSharedCache.get(0).isIsStatsCompliant(), false);
+    }
+
+    SharedCache.ColumStatsWithWriteId statPartCache = 
sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, CachedStore.partNameToVals(partName), colName[0], 
validWriteIds);
+    verifyStatDouble(statPartCache.getColumnStatisticsObj(), colName[0], 
highValue);
+
+    statPartCache = 
sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
+            CachedStore.partNameToVals(partName), colName[1], validWriteIds);
+    verifyStatString(statPartCache.getColumnStatisticsObj(), colName[1], 
avgColLen);
+  }
+
+  private List<ColumnStatisticsObj> getStatsObjects(String dbName, String 
tblName, String[] colName,
+                                                    double highValue, double 
avgColLen) throws Throwable {
     double lowValue = 50000.21;
-    double highValue = 1200000.4525;
     long numNulls = 3;
     long numDVs = 22;
-    double avgColLen = 50.30;
     long maxColLen = 102;
-    String[] colType = new String[] {"double", "string"};
     boolean isTblLevel = true;
     String partName = null;
     List<ColumnStatisticsObj> statsObjs = new ArrayList<>();
@@ -445,42 +533,96 @@ public class TestCachedStoreUpdateUsingEvents {
 
     statsObj.setStatsData(statsData);
     statsObjs.add(statsObj);
+    return statsObjs;
+  }
 
-    ColumnStatistics colStats = new ColumnStatistics();
-    colStats.setStatsDesc(statsDesc);
-    colStats.setStatsObj(statsObjs);
+  private void verifyStatDouble(ColumnStatisticsObj colStats, String colName, 
double highValue) {
+    double lowValue = 50000.21;
+    long numNulls = 3;
+    long numDVs = 22;
+    Assert.assertEquals(colStats.getColName(), colName);
+    
Assert.assertEquals(colStats.getStatsData().getDoubleStats().getLowValue(), 
lowValue, 0.01);
+    
Assert.assertEquals(colStats.getStatsData().getDoubleStats().getHighValue(), 
highValue, 0.01);
+    
Assert.assertEquals(colStats.getStatsData().getDoubleStats().getNumNulls(), 
numNulls);
+    Assert.assertEquals(colStats.getStatsData().getDoubleStats().getNumDVs(), 
numDVs);
+  }
 
-    // write stats objs persistently
-    hmsHandler.update_table_column_statistics(colStats);
-
-    ColumnStatisticsObj colStatsCache = 
sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
-            dbName, tblName, Lists.newArrayList(colName[0])).get(0);
-    Assert.assertEquals(colStatsCache.getColName(), colName[0]);
-    
Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getLowValue(),
 lowValue, 0.01);
-    
Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getHighValue(),
 highValue, 0.01);
-    
Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getNumNulls(),
 numNulls);
-    
Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getNumDVs(), 
numDVs);
-
-    // test delete column stats; if no col name is passed all column stats 
associated with the
-    // table is deleted
-    boolean status = hmsHandler.delete_table_column_statistics(dbName, 
tblName, null);
-    Assert.assertEquals(status, true);
+  private void verifyStatString(ColumnStatisticsObj colStats, String colName, 
double avgColLen) {
+    long numNulls = 3;
+    long numDVs = 22;
+    long maxColLen = 102;
+    Assert.assertEquals(colStats.getColName(), colName);
+    
Assert.assertEquals(colStats.getStatsData().getStringStats().getMaxColLen(), 
maxColLen);
+    
Assert.assertEquals(colStats.getStatsData().getStringStats().getAvgColLen(), 
avgColLen, 0.01);
+    
Assert.assertEquals(colStats.getStatsData().getStringStats().getNumNulls(), 
numNulls);
+    Assert.assertEquals(colStats.getStatsData().getStringStats().getNumDVs(), 
numDVs);
+  }
 
-    
Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
-            dbName, tblName, Lists.newArrayList(colName[0])).isEmpty(), true);
+  private void verifyStat(List<ColumnStatisticsObj> colStats, String[] 
colName, double highValue, double avgColLen) {
+    //verifyStatDouble(colStats.get(0), colName[0], highValue);
+    verifyStatString(colStats.get(0), colName[1], avgColLen);
+  }
 
-    tblName = "tbl_part";
-    cols = new ArrayList<>();
-    cols.add(new FieldSchema(colName[0], "int", null));
+  private void setUpBeforeTest(String dbName, String tblName, String[] 
colName, boolean isTxnTable) throws Throwable {
+    String dbOwner = "user1";
+
+    // Prewarm CachedStore
+    CachedStore.setCachePrewarmedState(false);
+    CachedStore.prewarm(rawStore);
+
+    // Add a db via rawStore
+    Database db = createTestDb(dbName, dbOwner);
+    hmsHandler.create_database(db);
+    if (tblName != null) {
+      createTestTable(dbName, tblName, colName, isTxnTable);
+    }
+  }
+
+  private void createTestTable(String dbName, String tblName, String[] 
colName, boolean isTxnTable) throws Throwable {
+    // Add a table via rawStore
+    List<FieldSchema> cols = new ArrayList<FieldSchema>();
+    cols.add(new FieldSchema(colName[0], "int", "integer column"));
+    cols.add(new FieldSchema(colName[1], "string", "string column"));
+
+    Map<String, String> tableParams =  new HashMap<>();
+    tableParams.put("test_param_1", "hi");
+    tableParams.put("test_param_2", "50");
+    if (isTxnTable) {
+      tableParams.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+    }
+
+    String tblOwner = "testowner";
+
+    List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+    ptnCols.add(new FieldSchema("ds", "string", "string partition column"));
+    ptnCols.add(new FieldSchema("hr", "int", "integer partition column"));
+
+    Table tbl = createTestTblParam(dbName, tblName, tblOwner, cols, null, 
tableParams);
+    hmsHandler.create_table(tbl);
+  }
+
+  private void createTableWithPart(String dbName, String tblName, String[] 
colName, boolean isTxnTbl) throws Throwable {
+    List<FieldSchema> cols = new ArrayList<>();
+    cols.add(new FieldSchema(colName[0], colType[0], null));
     List<FieldSchema> partCols = new ArrayList<>();
-    partCols.add(new FieldSchema("col", "int", null));
+    partCols.add(new FieldSchema(colName[0], colType[0], null));
+    Map<String, String> tableParams =  new HashMap<>();
+    tableParams.put("test_param_1", "hi");
+    tableParams.put("test_param_2", "50");
+    if (isTxnTbl) {
+      tableParams.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+      StatsSetupConst.setBasicStatsState(tableParams, StatsSetupConst.TRUE);
+    }
     StorageDescriptor sd =
-            new StorageDescriptor(cols, null, "input", "output", false,
+            new StorageDescriptor(cols, null, "orc",
+                    "orc", false,
                     0, new SerDeInfo("serde", "seriallib", new HashMap<>()),
-                    null, null, null);
+                    null, null, tableParams);
+    sd.setInputFormat(OrcInputFormat.class.getName());
+    sd.setOutputFormat(OrcOutputFormat.class.getName());
 
-    tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new 
HashMap<>(),
-                    null, null, TableType.MANAGED_TABLE.toString());
+    Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd,
+            partCols, tableParams, null, null, 
TableType.MANAGED_TABLE.toString());
     tbl.setCatName(DEFAULT_CATALOG_NAME);
 
     hmsHandler.create_table(tbl);
@@ -489,47 +631,405 @@ public class TestCachedStoreUpdateUsingEvents {
     partVals1.add("1");
     List<String> partVals2 = new ArrayList<>();
     partVals2.add("2");
+    Map<String, String> partParams =  new HashMap<>();
+    StatsSetupConst.setBasicStatsState(partParams, StatsSetupConst.TRUE);
+    EnvironmentContext environmentContext = new EnvironmentContext();
+    environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED, 
StatsSetupConst.TASK);
 
     Partition ptn1 =
-            new Partition(partVals1, dbName, tblName, 0, 0, sd, new 
HashMap<>());
+            new Partition(partVals1, dbName, tblName, 0, 0, sd, partParams);
     ptn1.setCatName(DEFAULT_CATALOG_NAME);
-    hmsHandler.add_partition(ptn1);
+    hmsHandler.add_partition_with_environment_context(ptn1, 
environmentContext);
     Partition ptn2 =
-            new Partition(partVals2, dbName, tblName, 0, 0, sd, new 
HashMap<>());
+            new Partition(partVals2, dbName, tblName, 0, 0, sd, partParams);
     ptn2.setCatName(DEFAULT_CATALOG_NAME);
-    hmsHandler.add_partition(ptn2);
+    hmsHandler.add_partition_with_environment_context(ptn2, 
environmentContext);
+  }
+
+  private List<Long> allocateTxns(int numTxns) throws Throwable {
+    OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "user", "host");
+    return hmsHandler.open_txns(openTxnRequest).getTxn_ids();
+  }
+
+  private List<TxnToWriteId> allocateWriteIds(List<Long> txnIds, String 
dbName, String tblName) throws Throwable {
+    AllocateTableWriteIdsRequest allocateTableWriteIdsRequest = new 
AllocateTableWriteIdsRequest(dbName, tblName);
+    allocateTableWriteIdsRequest.setTxnIds(txnIds);
+    return 
hmsHandler.allocate_table_write_ids(allocateTableWriteIdsRequest).getTxnToWriteIds();
+  }
+
+  private String getValidWriteIds(String dbName, String tblName) throws 
Throwable {
+    GetValidWriteIdsRequest validWriteIdsRequest = new GetValidWriteIdsRequest(
+            Collections.singletonList(TableName.getDbTable(dbName, tblName)));
+    GetValidWriteIdsResponse validWriteIdsResponse = 
hmsHandler.get_valid_write_ids(validWriteIdsRequest);
+    return TxnCommonUtils.createValidReaderWriteIdList(validWriteIdsResponse.
+            getTblValidWriteIds().get(0)).writeToString();
+  }
+
+  private void validateTablePara(String dbName, String tblName) throws 
Throwable {
+    Table tblRead = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
+    Table tblRead1 = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME, 
dbName, tblName);
+    Assert.assertEquals(tblRead.getParameters(), tblRead1.getParameters());
+  }
 
+  private void validatePartPara(String dbName, String tblName, String 
partName) throws Throwable {
+    //Partition part1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, 
tblName, partName);
+    //Partition part2 = 
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName, 
partName);
+    //Assert.assertEquals(part1.getParameters(), part2.getParameters());
+  }
+
+  private void deleteColStats(String dbName, String tblName, String[] colName) 
throws Throwable {
+    boolean status = hmsHandler.delete_table_column_statistics(dbName, 
tblName, null);
+    Assert.assertEquals(status, true);
+    
Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, 
dbName, tblName,
+            Lists.newArrayList(colName[0]),  null, 
true).getStatsObj().isEmpty(), true);
+    
Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME, 
dbName, tblName,
+            Lists.newArrayList(colName[1]), null, 
true).getStatsObj().isEmpty(), true);
+    validateTablePara(dbName, tblName);
+  }
+
+  private void deletePartColStats(String dbName, String tblName, String[] 
colName,
+                                  String partName) throws Throwable {
+    boolean status = hmsHandler.delete_partition_column_statistics(dbName, 
tblName, partName, colName[1]);
+    Assert.assertEquals(status, true);
+
+    SharedCache.ColumStatsWithWriteId colStats = 
sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName,
+            tblName, CachedStore.partNameToVals(partName), colName[1], null);
+    Assert.assertEquals(colStats.getColumnStatisticsObj(), null);
+    validateTablePara(dbName, tblName);
+  }
+
+  private void testTableColStatInternal(String dbName, String tblName, boolean 
isTxnTable) throws Throwable {
+    String[] colName = new String[]{"income", "name"};
+    double highValue = 1200000.4525;
+    double avgColLen = 50.30;
+
+    setUpBeforeTest(dbName, tblName, colName, isTxnTable);
+    updateTableColStats(dbName, tblName, colName, highValue, avgColLen, 
isTxnTable);
+    if (!isTxnTable) {
+      deleteColStats(dbName, tblName, colName);
+    }
+
+    tblName = "tbl_part";
+    createTableWithPart(dbName, tblName, colName, isTxnTable);
     List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, 
(short)-1);
-    partName = partitions.get(0);
-    isTblLevel = false;
+    String partName = partitions.get(0);
+    updatePartColStats(dbName, tblName, isTxnTable, colName, partName, 
highValue, avgColLen);
+    if (!isTxnTable) {
+      deletePartColStats(dbName, tblName, colName, partName);
+    }
+  }
+
+  @Test
+  public void testTableColumnStatistics() throws Throwable {
+    String dbName = "column_stats_test_db";
+    String tblName = "tbl";
+    testTableColStatInternal(dbName, tblName, false);
+  }
+
+  @Test
+  public void testTableColumnStatisticsTxnTable() throws Throwable {
+    String dbName = "column_stats_test_db_txn";
+    String tblName = "tbl_txn";
+    testTableColStatInternal(dbName, tblName, true);
+  }
+
+  @Test
+  public void testTableColumnStatisticsTxnTableMulti() throws Throwable {
+    String dbName = "column_stats_test_db_txn_multi";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
+    double highValue = 1200000.4525;
+    double avgColLen = 50.30;
+
+    setUpBeforeTest(dbName, null, colName, true);
+    createTableWithPart(dbName, tblName, colName, true);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, 
(short)-1);
+    String partName = partitions.get(0);
+    updatePartColStats(dbName, tblName, true, colName, partName, highValue, 
avgColLen);
+    updatePartColStats(dbName, tblName, true, colName, partName, 1200000.4521, 
avgColLen);
+    updatePartColStats(dbName, tblName, true, colName, partName, highValue, 
34.78);
+  }
+
+  @Test
+  public void testTableColumnStatisticsTxnTableMultiAbort() throws Throwable {
+    String dbName = "column_stats_test_db_txn_multi_abort";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
+    double highValue = 1200000.4525;
+    double avgColLen = 50.30;
+
+    setUpBeforeTest(dbName, null, colName, true);
+    createTableWithPart(dbName, tblName, colName, true);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, 
(short)-1);
+    String partName = partitions.get(0);
+
+    List<Long> txnIds = allocateTxns(1);
+    long writeId = allocateWriteIds(txnIds, dbName, 
tblName).get(0).getWriteId();
+    String validWriteIds = getValidWriteIds(dbName, tblName);
 
     // create a new columnstatistics desc to represent partition level column 
stats
-    statsDesc = new ColumnStatisticsDesc();
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
     statsDesc.setDbName(dbName);
     statsDesc.setTableName(tblName);
     statsDesc.setPartName(partName);
-    statsDesc.setIsTblLevel(isTblLevel);
+    statsDesc.setIsTblLevel(false);
+
+    ColumnStatistics colStats = new ColumnStatistics();
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, 
avgColLen));
+
+    SetPartitionsStatsRequest setTblColStat = new 
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
+
+    // write stats objs persistently
+    hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+    // abort the txn and verify that the stats got is not compliant.
+    AbortTxnRequest rqst = new AbortTxnRequest(txnIds.get(0));
+    hmsHandler.abort_txn(rqst);
+
+    allocateWriteIds(allocateTxns(1), dbName, tblName);
+    validWriteIds = getValidWriteIds(dbName, tblName);
+
+    Deadline.startTimer("getPartitionColumnStatistics");
+    List<ColumnStatistics> statRawStore = 
rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName,
+            Collections.singletonList(partName), 
Collections.singletonList(colName[1]), validWriteIds);
+    Deadline.stopTimer();
+
+    verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue, 
avgColLen);
+    Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false);
+
+    List<ColumnStatistics> statsListFromCache = 
sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, Collections.singletonList(partName), 
Collections.singletonList(colName[1]),
+            validWriteIds, true);
+    verifyStat(statsListFromCache.get(0).getStatsObj(), colName, highValue, 
avgColLen);
+    Assert.assertEquals(statsListFromCache.get(0).isIsStatsCompliant(), false);
+
+    SharedCache.ColumStatsWithWriteId columStatsWithWriteId =
+            sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, 
dbName, tblName,
+              CachedStore.partNameToVals(partName), colName[1], validWriteIds);
+    Assert.assertEquals(columStatsWithWriteId, null);
+    validatePartPara(dbName, tblName, partName);
+  }
+
+  @Test
+  public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable {
+    String dbName = "column_stats_test_db_txn_multi_open";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
+    double highValue = 1200000.4121;
+    double avgColLen = 23.30;
+
+    setUpBeforeTest(dbName, null, colName, true);
+    createTableWithPart(dbName, tblName, colName, true);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, 
(short)-1);
+    String partName = partitions.get(0);
+
+    // update part col stats successfully.
+    updatePartColStats(dbName, tblName, true, colName, partName, 1.2, 12.2);
+
+    List<Long> txnIds = allocateTxns(1);
+    long writeId = allocateWriteIds(txnIds, dbName, 
tblName).get(0).getWriteId();
+    String validWriteIds = getValidWriteIds(dbName, tblName);
+
+    // create a new columnstatistics desc to represent partition level column 
stats
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(tblName);
+    statsDesc.setPartName(partName);
+    statsDesc.setIsTblLevel(false);
+
+    ColumnStatistics colStats = new ColumnStatistics();
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue, 
avgColLen));
+
+    SetPartitionsStatsRequest setTblColStat = new 
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
+
+    // write stats objs persistently
+    hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+    // keep the txn open and verify that the stats got is not compliant.
+
+    allocateWriteIds(allocateTxns(1), dbName, tblName);
+    validWriteIds = getValidWriteIds(dbName, tblName);
+
+    Deadline.startTimer("getPartitionColumnStatistics");
+    List<ColumnStatistics> statRawStore = 
rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName,
+            Collections.singletonList(partName), 
Collections.singletonList(colName[1]), validWriteIds);
+    Deadline.stopTimer();
+
+    verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue, 
avgColLen);
+    Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false);
+
+    List<ColumnStatistics> statsListFromCache = 
sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
+            dbName, tblName, Collections.singletonList(partName), 
Collections.singletonList(colName[1]),
+            validWriteIds, true);
+    verifyStat(statsListFromCache.get(0).getStatsObj(), colName, highValue, 
avgColLen);
+    Assert.assertEquals(statsListFromCache.get(0).isIsStatsCompliant(), false);
+
+    SharedCache.ColumStatsWithWriteId columStatsWithWriteId =
+            sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, 
dbName,
+              tblName, CachedStore.partNameToVals(partName), colName[1], 
validWriteIds);
+    Assert.assertEquals(columStatsWithWriteId, null);
+    validatePartPara(dbName, tblName, partName);
+  }
+
+  private void verifyAggrStat(String dbName, String tblName, String[] colName, 
List<String> partitions,
+                              boolean isTxnTbl, double highValue) throws 
Throwable {
+    List<Long> txnIds = allocateTxns(1);
+    allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
+    String validWriteIds = getValidWriteIds(dbName, tblName);
+
+    Deadline.startTimer("getPartitionSpecsByFilterAndProjection");
+    AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, 
dbName, tblName, partitions,
+            Collections.singletonList(colName[0]), validWriteIds);
+    Deadline.stopTimer();
+    Assert.assertEquals(aggrStats.getPartsFound(), 2);
+    
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getDoubleStats().getHighValue(),
 highValue, 0.01);
+    //Assert.assertEquals(aggrStats.isIsStatsCompliant(), true);
+
+    // This will update the cache for non txn table.
+    PartitionsStatsRequest request = new PartitionsStatsRequest(dbName, 
tblName,
+            Collections.singletonList(colName[0]), partitions);
+    request.setCatName(DEFAULT_CATALOG_NAME);
+    request.setValidWriteIdList(validWriteIds);
+    AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request);
+    Assert.assertEquals(aggrStatsCached, aggrStats);
+    //Assert.assertEquals(aggrStatsCached.isIsStatsCompliant(), true);
+
+    List<ColumnStatisticsObj> stats = 
sharedCache.getAggrStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
+            Collections.singletonList(colName[0]), SharedCache.StatsType.ALL);
+    
Assert.assertEquals(stats.get(0).getStatsData().getDoubleStats().getHighValue(),
 highValue, 0.01);
+  }
+
+  @Test
+  public void testAggrStat() throws Throwable {
+    String dbName = "aggr_stats_test";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
+
+    setUpBeforeTest(dbName, null, colName, false);
+    createTableWithPart(dbName, tblName, colName, false);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, 
(short) -1);
+    String partName = partitions.get(0);
+
+    // update part col stats successfully.
+    updatePartColStats(dbName, tblName, false, colName, partitions.get(0), 2, 
12);
+    updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 4, 
10);
+    verifyAggrStat(dbName, tblName, colName, partitions, false, 4);
+
+    updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 3, 
10);
+    verifyAggrStat(dbName, tblName, colName, partitions, false, 3);
+  }
+
+  @Test
+  public void testAggrStatTxnTable() throws Throwable {
+    String dbName = "aggr_stats_test_db_txn";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
 
-    colStats = new ColumnStatistics();
+    setUpBeforeTest(dbName, null, colName, true);
+    createTableWithPart(dbName, tblName, colName, true);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, 
(short)-1);
+    String partName = partitions.get(0);
+
+    // update part col stats successfully.
+    updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 
12);
+    updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 
10);
+    verifyAggrStat(dbName, tblName, colName, partitions, true, 4);
+
+    updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 3, 
10);
+    verifyAggrStat(dbName, tblName, colName, partitions, true, 3);
+
+    List<Long> txnIds = allocateTxns(1);
+    long writeId = allocateWriteIds(txnIds, dbName, 
tblName).get(0).getWriteId();
+    String validWriteIds = getValidWriteIds(dbName, tblName);
+
+    // create a new columnstatistics desc to represent partition level column 
stats
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(tblName);
+    statsDesc.setPartName(partName);
+    statsDesc.setIsTblLevel(false);
+
+    ColumnStatistics colStats = new ColumnStatistics();
+    colStats.setStatsDesc(statsDesc);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, 5, 20));
+
+    SetPartitionsStatsRequest setTblColStat = new 
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
+    hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+    Deadline.startTimer("getPartitionSpecsByFilterAndProjection");
+    AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, 
dbName, tblName, partitions,
+            Collections.singletonList(colName[0]), validWriteIds);
+    Deadline.stopTimer();
+    Assert.assertEquals(aggrStats, null);
+
+    // keep the txn open and verify that the stats got is not compliant.
+    PartitionsStatsRequest request = new PartitionsStatsRequest(dbName, 
tblName,
+            Collections.singletonList(colName[0]), partitions);
+    request.setCatName(DEFAULT_CATALOG_NAME);
+    request.setValidWriteIdList(validWriteIds);
+    AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request);
+    Assert.assertEquals(aggrStatsCached, null);
+  }
+
+  @Test
+  public void testAggrStatAbortTxn() throws Throwable {
+    String dbName = "aggr_stats_test_db_txn_abort";
+    String tblName = "tbl_part";
+    String[] colName = new String[]{"income", "name"};
+
+    setUpBeforeTest(dbName, null, colName, true);
+    createTableWithPart(dbName, tblName, colName, true);
+    List<String> partitions = hmsHandler.get_partition_names(dbName, tblName, 
(short)-1);
+    String partName = partitions.get(0);
+
+    // update part col stats successfully.
+    updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2, 
12);
+    updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4, 
10);
+    verifyAggrStat(dbName, tblName, colName, partitions, true, 4);
+
+    List<Long> txnIds = allocateTxns(4);
+    long writeId = allocateWriteIds(txnIds, dbName, 
tblName).get(0).getWriteId();
+    String validWriteIds = getValidWriteIds(dbName, tblName);
+
+    // create a new columnstatistics desc to represent partition level column 
stats
+    ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+    statsDesc.setDbName(dbName);
+    statsDesc.setTableName(tblName);
+    statsDesc.setPartName(partName);
+    statsDesc.setIsTblLevel(false);
+
+    ColumnStatistics colStats = new ColumnStatistics();
     colStats.setStatsDesc(statsDesc);
-    colStats.setStatsObj(statsObjs);
-
-    hmsHandler.update_partition_column_statistics(colStats);
-    ColumnStatisticsObj colStats2 = 
sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
-            CachedStore.partNameToVals(partName), colName[1]);
-    // compare stats obj to ensure what we get is what we wrote
-    Assert.assertEquals(colStats.getStatsDesc().getPartName(), partName);
-    Assert.assertEquals(colStats2.getColName(), colName[1]);
-    
Assert.assertEquals(colStats2.getStatsData().getStringStats().getMaxColLen(), 
maxColLen);
-    
Assert.assertEquals(colStats2.getStatsData().getStringStats().getAvgColLen(), 
avgColLen, 0.01);
-    
Assert.assertEquals(colStats2.getStatsData().getStringStats().getNumNulls(), 
numNulls);
-    Assert.assertEquals(colStats2.getStatsData().getStringStats().getNumDVs(), 
numDVs);
-
-    // test stats deletion at partition level
-    hmsHandler.delete_partition_column_statistics(dbName, tblName, partName, 
colName[1]);
-
-    colStats2 = 
sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
-            CachedStore.partNameToVals(partName), colName[1]);
-    Assert.assertEquals(colStats2, null);
+    colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, 5, 20));
+
+    SetPartitionsStatsRequest setTblColStat = new 
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+    setTblColStat.setWriteId(writeId);
+    setTblColStat.setValidWriteIdList(validWriteIds);
+    hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+    AbortTxnRequest abortTxnRequest = new AbortTxnRequest(txnIds.get(0));
+    hmsHandler.abort_txn(abortTxnRequest);
+
+    Deadline.startTimer("getPartitionSpecsByFilterAndProjection");
+    AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, 
dbName, tblName, partitions,
+            Collections.singletonList(colName[0]), validWriteIds);
+    Deadline.stopTimer();
+    Assert.assertEquals(aggrStats, null);
+
+    // keep the txn open and verify that the stats got is not compliant.
+    PartitionsStatsRequest request = new PartitionsStatsRequest(dbName, 
tblName,
+            Collections.singletonList(colName[0]), partitions);
+    request.setCatName(DEFAULT_CATALOG_NAME);
+    request.setValidWriteIdList(validWriteIds);
+    AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request);
+    Assert.assertEquals(aggrStatsCached, null);
   }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java 
b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index c028e12..7c1944f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -448,7 +448,7 @@ public class StatsUpdaterThread extends Thread implements 
MetaStoreThread {
     }
     // TODO: we should probably skip updating if writeId is from an active txn
     boolean isTxnValid = (writeIdString == null) || 
ObjectStore.isCurrentStatsValidForTheQuery(
-        conf, params, statsWriteId , writeIdString, false);
+        params, statsWriteId, writeIdString, false);
     return getExistingStatsToUpdate(existingStats, params, isTxnValid);
   }
 
@@ -473,7 +473,7 @@ public class StatsUpdaterThread extends Thread implements 
MetaStoreThread {
     }
     // TODO: we should probably skip updating if writeId is from an active txn
     if (writeIdString != null && !ObjectStore.isCurrentStatsValidForTheQuery(
-        conf, params, statsWriteId, writeIdString, false)) {
+        params, statsWriteId, writeIdString, false)) {
       return allCols;
     }
     List<String> colsToUpdate = new ArrayList<>();
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index b43fb5e..4472f99 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -3684,7 +3684,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             try {
               boolean madeDir = createLocationForAddedPartition(table, 
partition);
               addedParts.put(new PartValEqWrapperLite(partition), madeDir);
-              initializeAddedPartition(table, partition, madeDir);
+              initializeAddedPartition(table, partition, madeDir, null);
             } catch (MetaException e) {
               throw new IOException(e.getMessage(), e);
             }
@@ -3973,15 +3973,18 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         return true;
     }
 
-    private void initializeAddedPartition(
-        final Table tbl, final Partition part, boolean madeDir) throws 
MetaException {
-      initializeAddedPartition(tbl, new 
PartitionSpecProxy.SimplePartitionWrapperIterator(part), madeDir);
+    private void initializeAddedPartition(final Table tbl, final Partition 
part, boolean madeDir,
+                                          EnvironmentContext 
environmentContext) throws MetaException {
+      initializeAddedPartition(tbl,
+              new PartitionSpecProxy.SimplePartitionWrapperIterator(part), 
madeDir, environmentContext);
     }
 
     private void initializeAddedPartition(
-        final Table tbl, final PartitionSpecProxy.PartitionIterator part, 
boolean madeDir) throws MetaException {
+        final Table tbl, final PartitionSpecProxy.PartitionIterator part, 
boolean madeDir,
+        EnvironmentContext environmentContext) throws MetaException {
       if (canUpdateStats(tbl)) {
-        MetaStoreServerUtils.updatePartitionStatsFast(part, tbl, wh, madeDir, 
false, null, true);
+        MetaStoreServerUtils.updatePartitionStatsFast(part, tbl, wh, madeDir,
+                false, environmentContext, true);
       }
 
       // set create time
@@ -4046,7 +4049,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         assert shouldAdd; // start would throw if it already existed here
         boolean madeDir = createLocationForAddedPartition(tbl, part);
         try {
-          initializeAddedPartition(tbl, part, madeDir);
+          initializeAddedPartition(tbl, part, madeDir, envContext);
           initializePartitionParameters(tbl, part);
           success = ms.addPartition(part);
         } finally {
@@ -5989,13 +5992,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           if (transactionalListeners != null && 
!transactionalListeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
                     EventType.UPDATE_TABLE_COLUMN_STAT,
-                    new UpdateTableColumnStatEvent(colStats, tableObj, 
parameters, validWriteIds,
+                    new UpdateTableColumnStatEvent(colStats, tableObj, 
parameters,
                             writeId, this));
           }
           if (!listeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(listeners,
                     EventType.UPDATE_TABLE_COLUMN_STAT,
-                    new UpdateTableColumnStatEvent(colStats, tableObj, 
parameters, validWriteIds,
+                    new UpdateTableColumnStatEvent(colStats, tableObj, 
parameters,
                             writeId,this));
           }
         }
@@ -6055,13 +6058,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           if (transactionalListeners != null && 
!transactionalListeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
               EventType.UPDATE_PARTITION_COLUMN_STAT,
-              new UpdatePartitionColumnStatEvent(colStats, partVals, 
parameters, tbl, validWriteIds,
+              new UpdatePartitionColumnStatEvent(colStats, partVals, 
parameters, tbl,
                       writeId, this));
           }
           if (!listeners.isEmpty()) {
             MetaStoreListenerNotifier.notifyEvent(listeners,
               EventType.UPDATE_PARTITION_COLUMN_STAT,
-              new UpdatePartitionColumnStatEvent(colStats, partVals, 
parameters, tbl, validWriteIds,
+              new UpdatePartitionColumnStatEvent(colStats, partVals, 
parameters, tbl,
                       writeId, this));
           }
         }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 0485fe9..c0bae3b 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -8629,7 +8629,7 @@ public class ObjectStore implements RawStore, 
Configurable {
             // Make sure we set the flag to invalid regardless of the current 
value.
             StatsSetupConst.setBasicStatsState(newParams, 
StatsSetupConst.FALSE);
             LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the 
partition "
-                + statsDesc.getDbName() + "." + statsDesc.getTableName() + "." 
+ statsDesc.getPartName());
+                    + statsDesc.getDbName() + "." + statsDesc.getTableName() + 
"." + statsDesc.getPartName());
           }
           mPartition.setWriteId(writeId);
         }
@@ -12479,7 +12479,7 @@ public class ObjectStore implements RawStore, 
Configurable {
    */
   private boolean isCurrentStatsValidForTheQuery(MTable tbl, String 
queryValidWriteIdList,
       boolean isCompleteStatsWriter) throws MetaException {
-    return isCurrentStatsValidForTheQuery(conf, tbl.getParameters(), 
tbl.getWriteId(),
+    return isCurrentStatsValidForTheQuery(tbl.getParameters(), 
tbl.getWriteId(),
         queryValidWriteIdList, isCompleteStatsWriter);
   }
 
@@ -12499,19 +12499,19 @@ public class ObjectStore implements RawStore, 
Configurable {
   private boolean isCurrentStatsValidForTheQuery(MPartition part,
       String queryValidWriteIdList, boolean isCompleteStatsWriter)
       throws MetaException {
-    return isCurrentStatsValidForTheQuery(conf, part.getParameters(), 
part.getWriteId(),
+    return isCurrentStatsValidForTheQuery(part.getParameters(), 
part.getWriteId(),
         queryValidWriteIdList, isCompleteStatsWriter);
   }
 
   private boolean isCurrentStatsValidForTheQuery(Partition part, long 
partWriteId,
       String queryValidWriteIdList, boolean isCompleteStatsWriter)
       throws MetaException {
-    return isCurrentStatsValidForTheQuery(conf, part.getParameters(), 
partWriteId,
+    return isCurrentStatsValidForTheQuery(part.getParameters(), partWriteId,
         queryValidWriteIdList, isCompleteStatsWriter);
   }
 
   // TODO: move to somewhere else
-  public static boolean isCurrentStatsValidForTheQuery(Configuration conf,
+  public static boolean isCurrentStatsValidForTheQuery(
       Map<String, String> statsParams, long statsWriteId, String 
queryValidWriteIdList,
       boolean isCompleteStatsWriter) throws MetaException {
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 7ad4bd2..182d5cc 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.HiveAlterHandler;
+import org.apache.hadoop.hive.metastore.HiveMetaException;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType;
 import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator;
@@ -205,7 +206,7 @@ public class CachedStore implements RawStore, Configurable {
    return sharedCache;
   }
 
-  static private ColumnStatistics updateStatsForPart(RawStore rawStore, Table 
before, String catalogName,
+  static private ColumnStatistics updateStatsForAlterPart(RawStore rawStore, 
Table before, String catalogName,
                                           String dbName, String tableName, 
Partition part) throws Exception {
     ColumnStatistics colStats;
     List<String> deletedCols = new ArrayList<>();
@@ -215,32 +216,31 @@ public class CachedStore implements RawStore, 
Configurable {
       sharedCache.removePartitionColStatsFromCache(catalogName, dbName, 
tableName, part.getValues(), column);
     }
     if (colStats != null) {
-      sharedCache.updatePartitionColStatsInCache(catalogName, dbName, 
tableName, part.getValues(), colStats.getStatsObj());
+      sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, 
tableName, part.getWriteId(),
+              part.getValues(), part.getParameters(), colStats.getStatsObj());
     }
     return colStats;
   }
 
-  static private void updateStatsForTable(RawStore rawStore, Table before, 
Table after, String catalogName,
+  static private void updateStatsForAlterTable(RawStore rawStore, Table 
tblBefore, Table tblAfter, String catalogName,
                                           String dbName, String tableName) 
throws Exception {
     ColumnStatistics colStats = null;
     List<String> deletedCols = new ArrayList<>();
-    if (before.isSetPartitionKeys()) {
+    if (tblBefore.isSetPartitionKeys()) {
       List<Partition> parts = sharedCache.listCachedPartitions(catalogName, 
dbName, tableName, -1);
       for (Partition part : parts) {
-        colStats = updateStatsForPart(rawStore, before, catalogName, dbName, 
tableName, part);
+        colStats = updateStatsForAlterPart(rawStore, tblBefore, catalogName, 
dbName, tableName, part);
       }
     }
 
-    boolean needUpdateAggrStat = false;
-    List<ColumnStatisticsObj> statisticsObjs = 
HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, before,
-            after,null, null, rawStore.getConf(), deletedCols);
+    List<ColumnStatisticsObj> statisticsObjs = 
HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, tblBefore,
+            tblAfter,null, null, rawStore.getConf(), deletedCols);
     if (colStats != null) {
-      sharedCache.updateTableColStatsInCache(catalogName, dbName, tableName, 
statisticsObjs);
-      needUpdateAggrStat = true;
+      sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName, 
tblAfter.getWriteId(),
+              statisticsObjs, tblAfter.getParameters());
     }
     for (String column : deletedCols) {
       sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, 
column);
-      needUpdateAggrStat = true;
     }
   }
 
@@ -309,10 +309,8 @@ public class CachedStore implements RawStore, Configurable 
{
           sharedCache.alterPartitionInCache(catalogName, dbName, tableName,
                   alterPartitionMessage.getPtnObjBefore().getValues(), 
alterPartitionMessage.getPtnObjAfter());
           //TODO : Use the stat object stored in the alter table message to 
update the stats in cache.
-          if (updateStatsForPart(rawStore, alterPartitionMessage.getTableObj(),
-                  catalogName, dbName, tableName, 
alterPartitionMessage.getPtnObjAfter()) != null) {
-            
CacheUpdateMasterWork.updateTableAggregatePartitionColStats(rawStore, 
catalogName, dbName, tableName);
-          }
+          updateStatsForAlterPart(rawStore, 
alterPartitionMessage.getTableObj(),
+                  catalogName, dbName, tableName, 
alterPartitionMessage.getPtnObjAfter());
           break;
         case MessageBuilder.DROP_PARTITION_EVENT:
           DropPartitionMessage dropPartitionMessage = 
deserializer.getDropPartitionMessage(message);
@@ -329,7 +327,7 @@ public class CachedStore implements RawStore, Configurable {
           AlterTableMessage alterTableMessage = 
deserializer.getAlterTableMessage(message);
           sharedCache.alterTableInCache(catalogName, dbName, tableName, 
alterTableMessage.getTableObjAfter());
           //TODO : Use the stat object stored in the alter table message to 
update the stats in cache.
-          updateStatsForTable(rawStore, alterTableMessage.getTableObjBefore(), 
alterTableMessage.getTableObjAfter(),
+          updateStatsForAlterTable(rawStore, 
alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(),
                   catalogName, dbName, tableName);
           break;
         case MessageBuilder.DROP_TABLE_EVENT:
@@ -371,8 +369,8 @@ public class CachedStore implements RawStore, Configurable {
           break;
         case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT:
           UpdateTableColumnStatMessage msg = 
deserializer.getUpdateTableColumnStatMessage(message);
-          updateTableColumnsStatsInternal(rawStore.getConf(), 
msg.getColumnStatistics(), msg.getParameters(),
-                  msg.getValidWriteIds(), msg.getWriteId());
+          sharedCache.alterTableAndStatsInCache(catalogName, dbName, 
tableName, msg.getWriteId(),
+                  msg.getColumnStatistics().getStatsObj(), 
msg.getParameters());
           break;
         case MessageBuilder.DELETE_TBL_COL_STAT_EVENT:
           DeleteTableColumnStatMessage msgDel = 
deserializer.getDeleteTableColumnStatMessage(message);
@@ -380,7 +378,8 @@ public class CachedStore implements RawStore, Configurable {
           break;
         case MessageBuilder.UPDATE_PART_COL_STAT_EVENT:
           UpdatePartitionColumnStatMessage msgPartUpdate = 
deserializer.getUpdatePartitionColumnStatMessage(message);
-          sharedCache.updatePartitionColStatsInCache(catalogName, dbName, 
tableName, msgPartUpdate.getPartVals(),
+          sharedCache.alterPartitionAndStatsInCache(catalogName, dbName, 
tableName, msgPartUpdate.getWriteId(),
+                  msgPartUpdate.getPartVals(), msgPartUpdate.getParameters(),
                   msgPartUpdate.getColumnStatistics().getStatsObj());
           break;
         case MessageBuilder.DELETE_PART_COL_STAT_EVENT:
@@ -909,7 +908,7 @@ public class CachedStore implements RawStore, Configurable {
           
sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
               StringUtils.normalizeIdentifier(dbName),
               StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
-              aggrStatsAllButDefaultPartition);
+              aggrStatsAllButDefaultPartition, null);
         }
       } catch (MetaException | NoSuchObjectException e) {
         LOG.info("Updating CachedStore: unable to read aggregate column stats 
of table: " + tblName,
@@ -948,7 +947,12 @@ public class CachedStore implements RawStore, Configurable 
{
     // the event related to the current transactions are updated in the cache 
and thus we can support strong
     // consistency in case there is only one metastore.
     if (canUseEvents) {
-      triggerUpdateUsingEvent(rawStore);
+      try {
+        triggerUpdateUsingEvent(rawStore);
+      } catch (Exception e) {
+        //TODO : Not sure how to handle it as the commit is already done in 
the object store.
+        LOG.error("Failed to update cache", e);
+      }
     }
     return true;
   }
@@ -2000,8 +2004,7 @@ public class CachedStore implements RawStore, 
Configurable {
       Map<String, String> params, long statsWriteId, String validWriteIds) 
throws MetaException {
     if (!TxnUtils.isTransactionalTable(tableParams)) return params; // Not a 
txn table.
     if (areTxnStatsSupported && ((validWriteIds == null)
-        || ObjectStore.isCurrentStatsValidForTheQuery(
-            conf, params, statsWriteId, validWriteIds, false))) {
+        || ObjectStore.isCurrentStatsValidForTheQuery(params, statsWriteId, 
validWriteIds, false))) {
       // Valid stats are supported for txn tables, and either no verification 
was requested by the
       // caller, or the verification has succeeded.
       return params;
@@ -2014,14 +2017,14 @@ public class CachedStore implements RawStore, 
Configurable {
 
 
   // Note: ideally this should be above both CachedStore and ObjectStore.
-  private ColumnStatistics adjustColStatForGet(Map<String, String> tableParams,
-      Map<String, String> params, ColumnStatistics colStat, long statsWriteId,
-      String validWriteIds) throws MetaException {
+  public static ColumnStatistics adjustColStatForGet(Map<String, String> 
tableParams,
+                                               ColumnStatistics colStat, long 
statsWriteId,
+      String validWriteIds, boolean areTxnStatsSupported) throws MetaException 
{
     colStat.setIsStatsCompliant(true);
     if (!TxnUtils.isTransactionalTable(tableParams)) return colStat; // Not a 
txn table.
     if (areTxnStatsSupported && ((validWriteIds == null)
         || ObjectStore.isCurrentStatsValidForTheQuery(
-            conf, params, statsWriteId, validWriteIds, false))) {
+            tableParams, statsWriteId, validWriteIds, false))) {
       // Valid stats are supported for txn tables, and either no verification 
was requested by the
       // caller, or the verification has succeeded.
       return colStat;
@@ -2058,7 +2061,7 @@ public class CachedStore implements RawStore, 
Configurable {
         if (errorMsg != null) {
           throw new MetaException(errorMsg);
         }
-        if (!ObjectStore.isCurrentStatsValidForTheQuery(conf, newParams, 
table.getWriteId(),
+        if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams, 
table.getWriteId(),
                 validWriteIds, true)) {
           // Make sure we set the flag to invalid regardless of the current 
value.
           StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
@@ -2111,11 +2114,14 @@ public class CachedStore implements RawStore, 
Configurable {
       return rawStore.getTableColumnStatistics(
           catName, dbName, tblName, colNames, validWriteIds);
     }
-    ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
-    List<ColumnStatisticsObj> colStatObjs =
-        sharedCache.getTableColStatsFromCache(catName, dbName, tblName, 
colNames);
-    return adjustColStatForGet(table.getParameters(), table.getParameters(),
-        new ColumnStatistics(csd, colStatObjs), table.getWriteId(), 
validWriteIds);
+    ColumnStatistics columnStatistics =
+        sharedCache.getTableColStatsFromCache(catName, dbName, tblName, 
colNames, validWriteIds, areTxnStatsSupported);
+    if (columnStatistics == null) {
+      LOG.info("Stat of Table {}.{} for column {} is not present in cache." +
+              "Getting from raw store", dbName, tblName, colNames);
+      return rawStore.getTableColumnStatistics(catName, dbName, tblName, 
colNames, validWriteIds);
+    }
+    return columnStatistics;
   }
 
   @Override
@@ -2170,10 +2176,17 @@ public class CachedStore implements RawStore, 
Configurable {
       String catName, String dbName, String tblName, List<String> partNames,
       List<String> colNames, String writeIdList)
       throws MetaException, NoSuchObjectException {
-    // TODO: why have updatePartitionColumnStatistics cache if this is a 
bypass?
-    // Note: when implemented, this needs to call adjustColStatForGet, like 
other get methods.
-    return rawStore.getPartitionColumnStatistics(
-        catName, dbName, tblName, partNames, colNames, writeIdList);
+
+    // If writeIdList is not null, that means stats are requested within a txn 
context. So set stats compliant to false,
+    // if areTxnStatsSupported is false or the write id which has updated the 
stats in not compatible with writeIdList.
+    // This is done within table lock as the number of partitions may be more 
than one and we need a consistent view
+    // for all the partitions.
+    List<ColumnStatistics> columnStatistics = 
sharedCache.getPartitionColStatsListFromCache(catName, dbName, tblName,
+            partNames, colNames, writeIdList, areTxnStatsSupported);
+    if (columnStatistics == null) {
+      return rawStore.getPartitionColumnStatistics(catName, dbName, tblName, 
partNames, colNames, writeIdList);
+    }
+    return columnStatistics;
   }
 
   @Override
@@ -2212,8 +2225,8 @@ public class CachedStore implements RawStore, 
Configurable {
     tblName = StringUtils.normalizeIdentifier(tblName);
     // TODO: we currently cannot do transactional checks for stats here
     //       (incl. due to lack of sync w.r.t. the below rawStore call).
-    //TODO : need to calculate aggregate locally in cached store
-    if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null || 
canUseEvents) {
+    // In case the cache is updated using events, aggregate is calculated 
locally and thus can be read from cache.
+    if (!shouldCacheTable(catName, dbName, tblName) || (writeIdList != null && 
!canUseEvents)) {
       return rawStore.get_aggr_stats_for(
           catName, dbName, tblName, partNames, colNames, writeIdList);
     }
@@ -2225,45 +2238,68 @@ public class CachedStore implements RawStore, 
Configurable {
     }
 
     List<String> allPartNames = rawStore.listPartitionNames(catName, dbName, 
tblName, (short) -1);
+    StatsType type = StatsType.PARTIAL;
     if (partNames.size() == allPartNames.size()) {
       colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName, 
colNames, StatsType.ALL);
       if (colStats != null) {
         return new AggrStats(colStats, partNames.size());
       }
+      type = StatsType.ALL;
     } else if (partNames.size() == (allPartNames.size() - 1)) {
       String defaultPartitionName = MetastoreConf.getVar(getConf(), 
ConfVars.DEFAULTPARTITIONNAME);
       if (!partNames.contains(defaultPartitionName)) {
-        colStats =
-            sharedCache.getAggrStatsFromCache(catName, dbName, tblName, 
colNames, StatsType.ALLBUTDEFAULT);
+        colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName, 
colNames, StatsType.ALLBUTDEFAULT);
         if (colStats != null) {
           return new AggrStats(colStats, partNames.size());
         }
+        type = StatsType.ALLBUTDEFAULT;
       }
     }
+
     LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, 
parts= {}, cols= {}",
         tblName, partNames, colNames);
-    MergedColumnStatsForPartitions mergedColStats =
-        mergeColStatsForPartitions(catName, dbName, tblName, partNames, 
colNames, sharedCache);
+    MergedColumnStatsForPartitions mergedColStats = 
mergeColStatsForPartitions(catName, dbName, tblName,
+              partNames, colNames, sharedCache, type, writeIdList);
+    if (mergedColStats == null) {
+      LOG.info("Aggregate stats of partition " + 
TableName.getQualified(catName, dbName, tblName) + "." +
+              partNames + " for columns " + colNames + " is not present in 
cache. Getting it from raw store");
+      return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, 
colNames, writeIdList);
+    }
     return new AggrStats(mergedColStats.getColStats(), 
mergedColStats.getPartsFound());
   }
 
   private MergedColumnStatsForPartitions mergeColStatsForPartitions(
       String catName, String dbName, String tblName, List<String> partNames, 
List<String> colNames,
-      SharedCache sharedCache) throws MetaException {
+      SharedCache sharedCache, StatsType type, String writeIdList) throws 
MetaException {
     final boolean useDensityFunctionForNDVEstimation =
         MetastoreConf.getBoolVar(getConf(), 
ConfVars.STATS_NDV_DENSITY_FUNCTION);
     final double ndvTuner = MetastoreConf.getDoubleVar(getConf(), 
ConfVars.STATS_NDV_TUNER);
     Map<ColumnStatsAggregator, List<ColStatsObjWithSourceInfo>> colStatsMap = 
new HashMap<>();
-    boolean areAllPartsFound = true;
-    long partsFound = 0;
+    long partsFound = partNames.size();
+    Map<List<String>, Long> partNameToWriteId = writeIdList != null ? new 
HashMap<>() : null;
     for (String colName : colNames) {
       long partsFoundForColumn = 0;
       ColumnStatsAggregator colStatsAggregator = null;
       List<ColStatsObjWithSourceInfo> colStatsWithPartInfoList = new 
ArrayList<>();
       for (String partName : partNames) {
-        ColumnStatisticsObj colStatsForPart =
-            sharedCache.getPartitionColStatsFromCache(catName, dbName, 
tblName, partNameToVals(partName), colName);
-        if (colStatsForPart != null) {
+        List<String> partValue = partNameToVals(partName);
+        // There are three possible result from getPartitionColStatsFromCache.
+        // 1. The partition has valid stats and thus colStatsWriteId returned 
is valid non-null value
+        // 2. Partition stat is missing from cache and thus colStatsWriteId 
returned is non-null but colstat
+        //    info in it is null. In this case we just ignore the partition 
from aggregate calculation to keep
+        //    the behavior same as object store.
+        // 3. Partition is missing or its stat is updated by live(not yet 
committed) or aborted txn. In this case,
+        //    colStatsWriteId is null. Thus null is returned to keep the 
behavior same as object store.
+        SharedCache.ColumStatsWithWriteId colStatsWriteId = 
sharedCache.getPartitionColStatsFromCache(catName, dbName,
+                tblName, partValue, colName, writeIdList);
+        if (colStatsWriteId == null) {
+          return null;
+        }
+        if (colStatsWriteId.getColumnStatisticsObj() != null) {
+          ColumnStatisticsObj colStatsForPart = 
colStatsWriteId.getColumnStatisticsObj();
+          if (partNameToWriteId != null) {
+            partNameToWriteId.put(partValue, colStatsWriteId.getWriteId());
+          }
           ColStatsObjWithSourceInfo colStatsWithPartInfo =
               new ColStatsObjWithSourceInfo(colStatsForPart, catName, dbName, 
tblName, partName);
           colStatsWithPartInfoList.add(colStatsWithPartInfo);
@@ -2282,7 +2318,9 @@ public class CachedStore implements RawStore, 
Configurable {
       if (colStatsWithPartInfoList.size() > 0) {
         colStatsMap.put(colStatsAggregator, colStatsWithPartInfoList);
       }
-      if (partsFoundForColumn == partNames.size()) {
+      // set partsFound to the min(partsFoundForColumn) for all columns. 
partsFound is the number of partitions, for
+      // which stats for all columns are present in the cache.
+      if (partsFoundForColumn < partsFound) {
         partsFound = partsFoundForColumn;
       }
       if (colStatsMap.size() < 1) {
@@ -2293,8 +2331,23 @@ public class CachedStore implements RawStore, 
Configurable {
     }
     // Note that enableBitVector does not apply here because 
ColumnStatisticsObj
     // itself will tell whether bitvector is null or not and aggr logic can 
automatically apply.
-    return new 
MergedColumnStatsForPartitions(MetaStoreServerUtils.aggrPartitionStats(colStatsMap,
-        partNames, areAllPartsFound, useDensityFunctionForNDVEstimation, 
ndvTuner), partsFound);
+    List<ColumnStatisticsObj> colAggrStats = 
MetaStoreServerUtils.aggrPartitionStats(colStatsMap,
+            partNames, partsFound == partNames.size(), 
useDensityFunctionForNDVEstimation, ndvTuner);
+
+    if (canUseEvents) {
+      if (type == StatsType.ALL) {
+        
sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
+                StringUtils.normalizeIdentifier(dbName),
+                StringUtils.normalizeIdentifier(tblName), new 
AggrStats(colAggrStats, partsFound),
+                null, partNameToWriteId);
+      } else if (type == StatsType.ALLBUTDEFAULT) {
+        
sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
+                StringUtils.normalizeIdentifier(dbName),
+                StringUtils.normalizeIdentifier(tblName), null,
+                new AggrStats(colAggrStats, partsFound), partNameToWriteId);
+      }
+    }
+    return new MergedColumnStatsForPartitions(colAggrStats, partsFound);
   }
 
   class MergedColumnStatsForPartitions {
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index ce9e383..1c23022 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -33,13 +33,18 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.TreeMap;
 
-import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.HiveMetaException;
+import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.Catalog;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -47,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
@@ -56,6 +62,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static 
org.apache.hadoop.hive.metastore.cache.CachedStore.partNameToVals;
 import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
 public class SharedCache {
@@ -85,7 +92,7 @@ public class SharedCache {
   private static HashMap<Class<?>, ObjectEstimator> sizeEstimators = null;
 
   enum StatsType {
-    ALL(0), ALLBUTDEFAULT(1);
+    ALL(0), ALLBUTDEFAULT(1), PARTIAL(2);
 
     private final int position;
 
@@ -249,6 +256,7 @@ public class SharedCache {
         tableLock.readLock().lock();
         PartitionWrapper wrapper = 
partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
         if (wrapper == null) {
+          LOG.debug("Partition: " + partVals + " is not present in the 
cache.");
           return null;
         }
         part = CacheUtils.assemble(wrapper, sharedCache);
@@ -342,6 +350,26 @@ public class SharedCache {
       }
     }
 
+    public void alterPartitionAndStats(List<String> partVals, SharedCache 
sharedCache, long writeId,
+                                       Map<String,String> parameters, 
List<ColumnStatisticsObj> colStatsObjs) {
+      try {
+        tableLock.writeLock().lock();
+        PartitionWrapper partitionWrapper = 
partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
+        if (partitionWrapper == null) {
+          LOG.info("Partition " + partVals + " is missing from cache. Cannot 
update the partition stats in cache.");
+          return;
+        }
+        Partition newPart = partitionWrapper.getPartition();
+        newPart.setParameters(parameters);
+        newPart.setWriteId(writeId);
+        removePartition(partVals, sharedCache);
+        cachePartition(newPart, sharedCache);
+        updatePartitionColStats(partVals, colStatsObjs);
+      } finally {
+        tableLock.writeLock().unlock();
+      }
+    }
+
     public void alterPartitions(List<List<String>> partValsList, 
List<Partition> newParts,
         SharedCache sharedCache) {
       try {
@@ -445,7 +473,9 @@ public class SharedCache {
       }
     }
 
-    public List<ColumnStatisticsObj> getCachedTableColStats(List<String> 
colNames) {
+    public ColumnStatistics getCachedTableColStats(ColumnStatisticsDesc csd, 
List<String> colNames,
+                                                            String 
validWriteIds, boolean areTxnStatsSupported)
+            throws MetaException {
       List<ColumnStatisticsObj> colStatObjs = new 
ArrayList<ColumnStatisticsObj>();
       try {
         tableLock.readLock().lock();
@@ -455,10 +485,11 @@ public class SharedCache {
             colStatObjs.add(colStatObj);
           }
         }
+        return CachedStore.adjustColStatForGet(getTable().getParameters(), new 
ColumnStatistics(csd, colStatObjs),
+                getTable().getWriteId(), validWriteIds, areTxnStatsSupported);
       } finally {
         tableLock.readLock().unlock();
       }
-      return colStatObjs;
     }
 
     public void removeTableColStats(String colName) {
@@ -485,16 +516,88 @@ public class SharedCache {
       }
     }
 
-    public ColumnStatisticsObj getPartitionColStats(List<String> partVal, 
String colName) {
+    public ColumStatsWithWriteId getPartitionColStats(List<String> partVal, 
String colName, String writeIdList) {
       try {
         tableLock.readLock().lock();
-        return partitionColStatsCache
-            .get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+        ColumnStatisticsObj statisticsObj =
+                
partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, 
colName));
+        if (statisticsObj == null || writeIdList == null) {
+          return new ColumStatsWithWriteId(-1, statisticsObj);
+        }
+        PartitionWrapper wrapper = 
partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal));
+        if (wrapper == null) {
+          LOG.info("Partition: " + partVal + " is not present in the cache. 
Cannot update stats in cache.");
+          return null;
+        }
+        long writeId = wrapper.getPartition().getWriteId();
+        ValidWriteIdList list4TheQuery = new 
ValidReaderWriteIdList(writeIdList);
+        // Just check if the write ID is valid. If it's valid (i.e. we are 
allowed to see it),
+        // that means it cannot possibly be a concurrent write. If it's not 
valid (we are not
+        // allowed to see it), that means it's either concurrent or aborted, 
same thing for us.
+        if (!list4TheQuery.isWriteIdValid(writeId)) {
+          LOG.debug("Write id list " + writeIdList + " is not compatible with 
write id " + writeId);
+          return null;
+        }
+        return new ColumStatsWithWriteId(writeId, statisticsObj);
       } finally {
         tableLock.readLock().unlock();
       }
     }
 
+    public List<ColumnStatistics> getPartColStatsList(List<String> partNames, 
List<String> colNames,
+                                              String writeIdList, boolean 
txnStatSupported) throws MetaException {
+      List<ColumnStatistics> colStatObjs = new ArrayList<>();
+      try {
+        tableLock.readLock().lock();
+        Table tbl = getTable();
+        for (String partName : partNames) {
+          ColumnStatisticsDesc csd = new ColumnStatisticsDesc(false,
+                  tbl.getDbName(), tbl.getTableName());
+          csd.setCatName(tbl.getCatName());
+          csd.setPartName(partName);
+          csd.setLastAnalyzed(0); //TODO : Need to get last analysed. This is 
not being used by anybody now.
+          List<ColumnStatisticsObj> statObject = new ArrayList<>();
+          List<String> partVal =  
Warehouse.getPartValuesFromPartName(partName);
+          for (String colName : colNames) {
+            ColumnStatisticsObj statisticsObj =
+                    
partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal, 
colName));
+            if (statisticsObj != null) {
+              statObject.add(statisticsObj);
+            } else {
+              LOG.info("Stats not available in cachedStore for col " + colName 
+ " in partition " + partVal);
+              return null;
+            }
+          }
+          ColumnStatistics columnStatistics = new ColumnStatistics(csd, 
statObject);
+          if (writeIdList != null && 
TxnUtils.isTransactionalTable(getParameters())) {
+            columnStatistics.setIsStatsCompliant(true);
+            if (!txnStatSupported) {
+              columnStatistics.setIsStatsCompliant(false);
+            } else {
+              PartitionWrapper wrapper =
+                      
partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal));
+              if (wrapper == null) {
+                columnStatistics.setIsStatsCompliant(false);
+              } else {
+                Partition partition = wrapper.getPartition();
+                if 
(!ObjectStore.isCurrentStatsValidForTheQuery(partition.getParameters(),
+                        partition.getWriteId(), writeIdList, false)) {
+                  LOG.debug("The current cached store transactional partition 
column statistics for {}.{}.{} "
+                                  + "(write ID {}) are not valid for current 
query ({})", tbl.getDbName(),
+                          tbl.getTableName(), partName, 
partition.getWriteId(), writeIdList);
+                  columnStatistics.setIsStatsCompliant(false);
+                }
+              }
+            }
+          }
+          colStatObjs.add(columnStatistics);
+        }
+      } finally {
+        tableLock.readLock().unlock();
+      }
+      return colStatObjs;
+    }
+
     public boolean updatePartitionColStats(List<String> partVal,
         List<ColumnStatisticsObj> colStatsObjs) {
       try {
@@ -661,11 +764,30 @@ public class SharedCache {
     }
 
     public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
-        AggrStats aggrStatsAllButDefaultPartition) {
+        AggrStats aggrStatsAllButDefaultPartition, SharedCache sharedCache, 
Map<List<String>, Long> partNameToWriteId) {
       Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
           new HashMap<String, List<ColumnStatisticsObj>>();
       try {
         tableLock.writeLock().lock();
+        if (partNameToWriteId != null) {
+          for (Entry<List<String>, Long> partValuesWriteIdSet : 
partNameToWriteId.entrySet()) {
+            List<String> partValues = partValuesWriteIdSet.getKey();
+            Partition partition = getPartition(partValues, sharedCache);
+            if (partition == null) {
+              LOG.info("Could not refresh the aggregate stat as partition " + 
partValues + " does not exist");
+              return;
+            }
+
+            // for txn tables, if the write id is modified means the partition 
is updated post fetching of stats. So
+            // skip updating the aggregate stats in the cache.
+            long writeId = partition.getWriteId();
+            if (writeId != partValuesWriteIdSet.getValue()) {
+              LOG.info("Could not refresh the aggregate stat as partition " + 
partValues + " has write id " +
+                      partValuesWriteIdSet.getValue() + " instead of " + 
writeId);
+              return;
+            }
+          }
+        }
         if (aggrStatsAllPartitions != null) {
           for (ColumnStatisticsObj statObj : 
aggrStatsAllPartitions.getColStats()) {
             if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
@@ -794,6 +916,23 @@ public class SharedCache {
     }
   }
 
+  public static class ColumStatsWithWriteId {
+    private long writeId;
+    private ColumnStatisticsObj columnStatisticsObj;
+    public ColumStatsWithWriteId(long writeId, ColumnStatisticsObj 
columnStatisticsObj) {
+      this.writeId = writeId;
+      this.columnStatisticsObj = columnStatisticsObj;
+    }
+
+    public long getWriteId() {
+      return writeId;
+    }
+
+    public ColumnStatisticsObj getColumnStatisticsObj() {
+      return columnStatisticsObj;
+    }
+  }
+
   public void populateCatalogsInCache(Collection<Catalog> catalogs) {
     for (Catalog cat : catalogs) {
       Catalog catCopy = cat.deepCopy();
@@ -1205,6 +1344,30 @@ public class SharedCache {
     }
   }
 
+  public void alterTableAndStatsInCache(String catName, String dbName, String 
tblName, long writeId,
+                                        List<ColumnStatisticsObj> 
colStatsObjs, Map<String,String> newParams) {
+    try {
+      cacheLock.writeLock().lock();
+      TableWrapper tblWrapper =
+              tableCache.remove(CacheUtils.buildTableKey(catName, dbName, 
tblName));
+      if (tblWrapper == null) {
+        LOG.info("Table " + tblName + " is missing from cache. Cannot update 
table stats in cache");
+        return;
+      }
+      Table newTable = tblWrapper.getTable();
+      newTable.setWriteId(writeId);
+      newTable.setParameters(newParams);
+      //tblWrapper.updateTableObj(newTable, this);
+      String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
+      String newTblName = 
StringUtils.normalizeIdentifier(newTable.getTableName());
+      tableCache.put(CacheUtils.buildTableKey(catName, newDbName, newTblName), 
tblWrapper);
+      tblWrapper.updateTableColStats(colStatsObjs);
+      isTableCacheDirty.set(true);
+    } finally {
+      cacheLock.writeLock().unlock();
+    }
+  }
+
   public List<Table> listCachedTables(String catName, String dbName) {
     List<Table> tables = new ArrayList<>();
     try {
@@ -1299,19 +1462,20 @@ public class SharedCache {
     }
   }
 
-  public List<ColumnStatisticsObj> getTableColStatsFromCache(String catName, 
String dbName,
-      String tblName, List<String> colNames) {
-    List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
+  public ColumnStatistics getTableColStatsFromCache(String catName, String 
dbName,
+      String tblName, List<String> colNames, String validWriteIds, boolean 
areTxnStatsSupported) throws MetaException {
     try {
       cacheLock.readLock().lock();
       TableWrapper tblWrapper = 
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
-      if (tblWrapper != null) {
-        colStatObjs = tblWrapper.getCachedTableColStats(colNames);
+      if (tblWrapper == null) {
+        LOG.info("Table " + tblName + " is missing from cache.");
+        return null;
       }
+      ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, 
tblName);
+      return tblWrapper.getCachedTableColStats(csd, colNames, validWriteIds, 
areTxnStatsSupported);
     } finally {
       cacheLock.readLock().unlock();
     }
-    return colStatObjs;
   }
 
   public void removeTableColStatsFromCache(String catName, String dbName, 
String tblName,
@@ -1321,6 +1485,8 @@ public class SharedCache {
       TableWrapper tblWrapper = 
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.removeTableColStats(colName);
+      } else {
+        LOG.info("Table " + tblName + " is missing from cache.");
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1333,6 +1499,8 @@ public class SharedCache {
       TableWrapper tblWrapper = 
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.removeAllTableColStats();
+      } else {
+        LOG.info("Table " + tblName + " is missing from cache.");
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1347,6 +1515,8 @@ public class SharedCache {
           tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
       if (tblWrapper != null) {
         tblWrapper.updateTableColStats(colStatsForTable);
+      } else {
+        LOG.info("Table " + tableName + " is missing from cache.");
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1361,6 +1531,8 @@ public class SharedCache {
           tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
       if (tblWrapper != null) {
         tblWrapper.refreshTableColStats(colStatsForTable);
+      } else {
+        LOG.info("Table " + tableName + " is missing from cache.");
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1513,6 +1685,20 @@ public class SharedCache {
     }
   }
 
+  public void alterPartitionAndStatsInCache(String catName, String dbName, 
String tblName, long writeId,
+                                            List<String> partVals, 
Map<String,String> parameters,
+                                            List<ColumnStatisticsObj> 
colStatsObjs) {
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = 
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      if (tblWrapper != null) {
+        tblWrapper.alterPartitionAndStats(partVals, this, writeId, parameters, 
colStatsObjs);
+      }
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+  }
+
   public void alterPartitionsInCache(String catName, String dbName, String 
tblName,
       List<List<String>> partValsList, List<Partition> newParts) {
     try {
@@ -1578,14 +1764,14 @@ public class SharedCache {
     }
   }
 
-  public ColumnStatisticsObj getPartitionColStatsFromCache(String catName, 
String dbName,
-      String tblName, List<String> partVal, String colName) {
-    ColumnStatisticsObj colStatObj = null;
+  public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, 
String dbName,
+      String tblName, List<String> partVal, String colName, String 
writeIdList) {
+    ColumStatsWithWriteId colStatObj = null;
     try {
       cacheLock.readLock().lock();
       TableWrapper tblWrapper = 
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
-        colStatObj = tblWrapper.getPartitionColStats(partVal, colName);
+        colStatObj = tblWrapper.getPartitionColStats(partVal, colName, 
writeIdList);
       }
     } finally {
       cacheLock.readLock().unlock();
@@ -1593,6 +1779,24 @@ public class SharedCache {
     return colStatObj;
   }
 
+  public List<ColumnStatistics> getPartitionColStatsListFromCache(String 
catName, String dbName, String tblName,
+                                                                  List<String> 
partNames, List<String> colNames,
+                                                                  String 
writeIdList, boolean txnStatSupported) {
+    List<ColumnStatistics> colStatObjs = null;
+    try {
+      cacheLock.readLock().lock();
+      TableWrapper tblWrapper = 
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+      if (tblWrapper != null) {
+        colStatObjs = tblWrapper.getPartColStatsList(partNames, colNames, 
writeIdList, txnStatSupported);
+      }
+    } catch (MetaException e) {
+      LOG.warn("Failed to get partition column statistics");
+    } finally {
+      cacheLock.readLock().unlock();
+    }
+    return colStatObjs;
+  }
+
   public void refreshPartitionColStatsInCache(String catName, String dbName, 
String tblName,
       List<ColumnStatistics> partitionColStats) {
     try {
@@ -1635,13 +1839,14 @@ public class SharedCache {
   }
 
   public void refreshAggregateStatsInCache(String catName, String dbName, 
String tblName,
-      AggrStats aggrStatsAllPartitions, AggrStats 
aggrStatsAllButDefaultPartition) {
+      AggrStats aggrStatsAllPartitions, AggrStats 
aggrStatsAllButDefaultPartition,
+                                           Map<List<String>, Long> 
partNameToWriteId) {
     try {
       cacheLock.readLock().lock();
       TableWrapper tblWrapper = 
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
       if (tblWrapper != null) {
         tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions,
-            aggrStatsAllButDefaultPartition);
+            aggrStatsAllButDefaultPartition, this, partNameToWriteId);
       }
     } finally {
       cacheLock.readLock().unlock();
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java
index 094f799..ba61a08 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java
@@ -35,7 +35,6 @@ import java.util.Map;
 @InterfaceStability.Stable
 public class UpdatePartitionColumnStatEvent extends ListenerEvent {
   private ColumnStatistics partColStats;
-  private String validWriteIds;
   private long writeId;
   private Map<String, String> parameters;
   private List<String> partVals;
@@ -45,16 +44,14 @@ public class UpdatePartitionColumnStatEvent extends 
ListenerEvent {
    * @param statsObj Columns statistics Info.
    * @param partVals partition names
    * @param parameters table parameters to be updated after stats are updated.
-   * @param validWriteIds valid write id list for the query.
+   * @param tableObj table object
    * @param writeId writeId for the query.
    * @param handler handler that is firing the event
    */
   public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj, 
List<String> partVals, Map<String, String> parameters,
-                                        Table tableObj, String validWriteIds, 
long writeId,
-                                        IHMSHandler handler) {
+                                        Table tableObj, long writeId, 
IHMSHandler handler) {
     super(true, handler);
     this.partColStats = statsObj;
-    this.validWriteIds = validWriteIds;
     this.writeId = writeId;
     this.parameters = parameters;
     this.partVals = partVals;
@@ -71,7 +68,6 @@ public class UpdatePartitionColumnStatEvent extends 
ListenerEvent {
     super(true, handler);
     this.partColStats = statsObj;
     this.partVals = partVals;
-    this.validWriteIds = null;
     this.writeId = 0;
     this.parameters = null;
     this.tableObj = tableObj;
@@ -81,10 +77,6 @@ public class UpdatePartitionColumnStatEvent extends 
ListenerEvent {
     return partColStats;
   }
 
-  public String getValidWriteIds() {
-    return validWriteIds;
-  }
-
   public long getWriteId() {
     return writeId;
   }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java
index 3f988bb..71300ab 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java
@@ -35,24 +35,22 @@ import java.util.Map;
 @InterfaceStability.Stable
 public class UpdateTableColumnStatEvent extends ListenerEvent {
   private ColumnStatistics colStats;
-  private String validWriteIds;
   private long writeId;
   private Map<String, String> parameters;
   private Table tableObj;
 
   /**
    * @param colStats Columns statistics Info.
+   * @param tableObj table object
    * @param parameters table parameters to be updated after stats are updated.
-   * @param validWriteIds valid write id list for the query.
-   * @param colStats writeId for the query.
+   * @param writeId writeId for the query.
    * @param handler handler that is firing the event
    */
   public UpdateTableColumnStatEvent(ColumnStatistics colStats, Table tableObj,
-                                    Map<String, String> parameters, String 
validWriteIds,
+                                    Map<String, String> parameters,
                                     long writeId, IHMSHandler handler) {
     super(true, handler);
     this.colStats = colStats;
-    this.validWriteIds = validWriteIds;
     this.writeId = writeId;
     this.parameters = parameters;
     this.tableObj = tableObj;
@@ -65,7 +63,6 @@ public class UpdateTableColumnStatEvent extends ListenerEvent 
{
   public UpdateTableColumnStatEvent(ColumnStatistics colStats, IHMSHandler 
handler) {
     super(true, handler);
     this.colStats = colStats;
-    this.validWriteIds = null;
     this.writeId = 0;
     this.parameters = null;
     this.tableObj = null;
@@ -75,10 +72,6 @@ public class UpdateTableColumnStatEvent extends 
ListenerEvent {
     return colStats;
   }
 
-  public String getValidWriteIds() {
-    return validWriteIds;
-  }
-
   public long getWriteId() {
     return writeId;
   }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
index 10c6b44..15c4769 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
@@ -289,9 +289,9 @@ public class MessageBuilder {
   public JSONUpdateTableColumnStatMessage 
buildUpdateTableColumnStatMessage(ColumnStatistics colStats,
                                                                             
Table tableObj,
                                                                             
Map<String, String> parameters,
-                                                                            
String validWriteIds, long writeId) {
+                                                                            
long writeId) {
     return new JSONUpdateTableColumnStatMessage(MS_SERVER_URL, 
MS_SERVICE_PRINCIPAL, now(),
-            colStats, tableObj, parameters, validWriteIds, writeId);
+            colStats, tableObj, parameters, writeId);
   }
 
   public JSONDeleteTableColumnStatMessage 
buildDeleteTableColumnStatMessage(String dbName, String colName) {
@@ -300,9 +300,9 @@ public class MessageBuilder {
 
   public JSONUpdatePartitionColumnStatMessage 
buildUpdatePartitionColumnStatMessage(ColumnStatistics colStats,
                                                             List<String> 
partVals, Map<String, String> parameters,
-                                                            Table tableObj, 
String validWriteIds, long writeId) {
+                                                            Table tableObj, 
long writeId) {
     return new JSONUpdatePartitionColumnStatMessage(MS_SERVER_URL, 
MS_SERVICE_PRINCIPAL, now(), colStats, partVals,
-            parameters, tableObj, validWriteIds, writeId);
+            parameters, tableObj, writeId);
   }
 
   public JSONDeletePartitionColumnStatMessage 
buildDeletePartitionColumnStatMessage(String dbName, String colName,
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java
index 7eb6c07..e92a0dc 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java
@@ -34,8 +34,6 @@ public abstract class UpdatePartitionColumnStatMessage 
extends EventMessage {
 
   public abstract ColumnStatistics getColumnStatistics();
 
-  public abstract String getValidWriteIds();
-
   public abstract Long getWriteId();
 
   public abstract Map<String, String> getParameters();
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java
index 7919b0e..e3f049c 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java
@@ -33,8 +33,6 @@ public abstract class UpdateTableColumnStatMessage extends 
EventMessage {
 
   public abstract ColumnStatistics getColumnStatistics();
 
-  public abstract String getValidWriteIds();
-
   public abstract Long getWriteId();
 
   public abstract Map<String, String> getParameters();
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java
index 1b35df5..fd7fe00 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java
@@ -38,7 +38,7 @@ public class JSONUpdatePartitionColumnStatMessage extends 
UpdatePartitionColumnS
   private Long writeId, timestamp;
 
   @JsonProperty
-  private String validWriteIds, server, servicePrincipal, database;
+  private String server, servicePrincipal, database;
 
   @JsonProperty
   private String colStatsJson;
@@ -61,12 +61,11 @@ public class JSONUpdatePartitionColumnStatMessage extends 
UpdatePartitionColumnS
   public JSONUpdatePartitionColumnStatMessage(String server, String 
servicePrincipal, Long timestamp,
                                               ColumnStatistics colStats, 
List<String> partVals,
                                               Map<String, String> parameters,
-                                              Table tableObj, String 
validWriteIds, long writeId) {
+                                              Table tableObj, long writeId) {
     this.timestamp = timestamp;
     this.server = server;
     this.servicePrincipal = servicePrincipal;
     this.writeId = writeId;
-    this.validWriteIds = validWriteIds;
     this.database = colStats.getStatsDesc().getDbName();
     this.partVals = partVals;
     try {
@@ -108,11 +107,6 @@ public class JSONUpdatePartitionColumnStatMessage extends 
UpdatePartitionColumnS
   }
 
   @Override
-  public String getValidWriteIds() {
-    return validWriteIds;
-  }
-
-  @Override
   public Long getWriteId() {
     return writeId;
   }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java
index c932b7c..275d204 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java
@@ -36,7 +36,7 @@ public class JSONUpdateTableColumnStatMessage extends 
UpdateTableColumnStatMessa
   private Long writeId, timestamp;
 
   @JsonProperty
-  private String validWriteIds, server, servicePrincipal, database;
+  private String server, servicePrincipal, database;
 
   @JsonProperty
   private String colStatsJson;
@@ -55,12 +55,11 @@ public class JSONUpdateTableColumnStatMessage extends 
UpdateTableColumnStatMessa
 
   public JSONUpdateTableColumnStatMessage(String server, String 
servicePrincipal, Long timestamp,
                       ColumnStatistics colStats, Table tableObj, Map<String, 
String> parameters,
-                                          String validWriteIds, long writeId) {
+                                           long writeId) {
     this.timestamp = timestamp;
     this.server = server;
     this.servicePrincipal = servicePrincipal;
     this.writeId = writeId;
-    this.validWriteIds = validWriteIds;
     this.database = colStats.getStatsDesc().getDbName();
     try {
       this.colStatsJson = MessageBuilder.createTableColumnStatJson(colStats);
@@ -106,11 +105,6 @@ public class JSONUpdateTableColumnStatMessage extends 
UpdateTableColumnStatMessa
   }
 
   @Override
-  public String getValidWriteIds() {
-    return validWriteIds;
-  }
-
-  @Override
   public Long getWriteId() {
     return writeId;
   }
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
 
b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 2f102a2..02ff4ae 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -177,6 +177,8 @@ CREATE TABLE "APP"."NOTIFICATION_LOG" (
     "MESSAGE_FORMAT" VARCHAR(16)
 );
 
+CREATE UNIQUE INDEX "APP"."NOTIFICATION_LOG_EVENT_ID" ON 
"APP"."NOTIFICATION_LOG" ("EVENT_ID");
+
 CREATE TABLE "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID" BIGINT NOT NULL, 
"NEXT_EVENT_ID" BIGINT NOT NULL);
 
 CREATE TABLE "APP"."KEY_CONSTRAINTS" ("CHILD_CD_ID" BIGINT, 
"CHILD_INTEGER_IDX" INTEGER, "CHILD_TBL_ID" BIGINT, "PARENT_CD_ID" BIGINT , 
"PARENT_INTEGER_IDX" INTEGER, "PARENT_TBL_ID" BIGINT NOT NULL,  "POSITION" 
BIGINT NOT NULL, "CONSTRAINT_NAME" VARCHAR(400) NOT NULL, "CONSTRAINT_TYPE" 
SMALLINT NOT NULL, "UPDATE_RULE" SMALLINT, "DELETE_RULE" SMALLINT, 
"ENABLE_VALIDATE_RELY" SMALLINT NOT NULL, "DEFAULT_VALUE" VARCHAR(400));
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
 
b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index 8ad56fc..1a1e34a 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -9,6 +9,9 @@ UPDATE "APP"."WM_RESOURCEPLAN" SET NS = 'default' WHERE NS IS 
NULL;
 DROP INDEX "APP"."UNIQUE_WM_RESOURCEPLAN";
 CREATE UNIQUE INDEX "APP"."UNIQUE_WM_RESOURCEPLAN" ON "APP"."WM_RESOURCEPLAN" 
("NS", "NAME");
 
+-- HIVE-21063
+CREATE UNIQUE INDEX "APP"."NOTIFICATION_LOG_EVENT_ID" ON 
"APP"."NOTIFICATION_LOG" ("EVENT_ID");
+
 -- This needs to be the last thing done.  Insert any changes above this line.
 UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
 
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 383d3bc..4f58343 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -629,6 +629,8 @@ CREATE TABLE NOTIFICATION_LOG
 
 ALTER TABLE NOTIFICATION_LOG ADD CONSTRAINT NOTIFICATION_LOG_PK PRIMARY KEY 
(NL_ID);
 
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG (EVENT_ID);
+
 CREATE TABLE NOTIFICATION_SEQUENCE
 (
     NNI_ID bigint NOT NULL,
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index edde08d..e0d143a 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -10,6 +10,9 @@ UPDATE WM_RESOURCEPLAN SET NS = 'default' WHERE NS IS NULL;
 DROP INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN;
 CREATE UNIQUE INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN ("NS", "NAME");
 
+-- HIVE-21063
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG (EVENT_ID);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 5466537..8db11d3 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -871,6 +871,8 @@ CREATE TABLE IF NOT EXISTS `NOTIFICATION_LOG`
     PRIMARY KEY (`NL_ID`)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
+CREATE UNIQUE INDEX `NOTIFICATION_LOG_EVENT_ID` ON NOTIFICATION_LOG 
(`EVENT_ID`) USING BTREE;
+
 CREATE TABLE IF NOT EXISTS `NOTIFICATION_SEQUENCE`
 (
     `NNI_ID` BIGINT(20) NOT NULL,
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
 
b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 701acb0..47c3831 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -11,6 +11,9 @@ UPDATE `WM_RESOURCEPLAN` SET `NS` = 'default' WHERE `NS` IS 
NULL;
 ALTER TABLE `WM_RESOURCEPLAN` DROP KEY `UNIQUE_WM_RESOURCEPLAN`;
 ALTER TABLE `WM_RESOURCEPLAN` ADD UNIQUE KEY `UNIQUE_WM_RESOURCEPLAN` (`NAME`, 
`NS`);
 
+-- HIVE-21063
+CREATE UNIQUE INDEX `NOTIFICATION_LOG_EVENT_ID` ON NOTIFICATION_LOG 
(`EVENT_ID`) USING BTREE;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS ' ';
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
 
b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index 2a9c38f..8af9a76 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -624,6 +624,8 @@ CREATE TABLE NOTIFICATION_LOG
 
 ALTER TABLE NOTIFICATION_LOG ADD CONSTRAINT NOTIFICATION_LOG_PK PRIMARY KEY 
(NL_ID);
 
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG(EVENT_ID);
+
 CREATE TABLE NOTIFICATION_SEQUENCE
 (
     NNI_ID NUMBER NOT NULL,
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
 
b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index b9f6331..231376b 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -9,6 +9,9 @@ UPDATE WM_RESOURCEPLAN SET NS = 'default' WHERE NS IS NULL;
 DROP INDEX UNIQUE_WM_RESOURCEPLAN;
 CREATE UNIQUE INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN (NS, "NAME");
 
+-- HIVE-21063
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG(EVENT_ID);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release 
version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status 
from dual;
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
 
b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index 0a359d9..2aff1e4 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -631,6 +631,8 @@ CREATE TABLE "NOTIFICATION_LOG"
     PRIMARY KEY ("NL_ID")
 );
 
+CREATE UNIQUE INDEX "NOTIFICATION_LOG_EVENT_ID" ON "NOTIFICATION_LOG" USING 
btree ("EVENT_ID");
+
 CREATE TABLE "NOTIFICATION_SEQUENCE"
 (
     "NNI_ID" BIGINT NOT NULL,
diff --git 
a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
 
b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index 0c36069..2d4363b 100644
--- 
a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ 
b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -11,6 +11,9 @@ UPDATE "WM_RESOURCEPLAN" SET "NS" = 'default' WHERE "NS" IS 
NULL;
 ALTER TABLE "WM_RESOURCEPLAN" DROP CONSTRAINT "UNIQUE_WM_RESOURCEPLAN";
 ALTER TABLE ONLY "WM_RESOURCEPLAN" ADD CONSTRAINT "UNIQUE_WM_RESOURCEPLAN" 
UNIQUE ("NS", "NAME");
 
+-- HIVE-21063
+CREATE UNIQUE INDEX "NOTIFICATION_LOG_EVENT_ID" ON "NOTIFICATION_LOG" USING 
btree ("EVENT_ID");
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release 
version 4.0.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
index 7429d18..77e0c98 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
@@ -95,6 +95,7 @@ import 
org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.Type;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.thrift.TException;
 import org.junit.Test;
@@ -3325,22 +3326,23 @@ public abstract class TestHiveMetaStore {
     Warehouse wh = mock(Warehouse.class);
     //Execute initializeAddedPartition() and it should not trigger 
updatePartitionStatsFast() as DO_NOT_UPDATE_STATS is true
     HiveMetaStore.HMSHandler hms = new HiveMetaStore.HMSHandler("", conf, 
false);
-    Method m = hms.getClass().getDeclaredMethod("initializeAddedPartition", 
Table.class, Partition.class, boolean.class);
+    Method m = hms.getClass().getDeclaredMethod("initializeAddedPartition", 
Table.class, Partition.class,
+            boolean.class, EnvironmentContext.class);
     m.setAccessible(true);
     //Invoke initializeAddedPartition();
-    m.invoke(hms, tbl, part, false);
+    m.invoke(hms, tbl, part, false, null);
     verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation());
 
     //Remove tbl's DO_NOT_UPDATE_STATS & set STATS_AUTO_GATHER = false
     tbl.unsetParameters();
     MetastoreConf.setBoolVar(conf, ConfVars.STATS_AUTO_GATHER, false);
-    m.invoke(hms, tbl, part, false);
+    m.invoke(hms, tbl, part, false, null);
     verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation());
 
     //Set STATS_AUTO_GATHER = true and set tbl as a VIRTUAL_VIEW
     MetastoreConf.setBoolVar(conf, ConfVars.STATS_AUTO_GATHER, true);
     tbl.setTableType("VIRTUAL_VIEW");
-    m.invoke(hms, tbl, part, false);
+    m.invoke(hms, tbl, part, false, null);
     verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation());
   }
 }

Reply via email to