This is an automated email from the ASF dual-hosted git repository.
mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0223cbf HIVE-21063 : Support statistics in cachedStore for
transactional table. (Mahesh Kumar Behera, reviewed by Sankar Hariappan, Daniel
Dai)
0223cbf is described below
commit 0223cbf15ae2b27d4879627760a4dab4d7cc713f
Author: Mahesh Kumar Behera <[email protected]>
AuthorDate: Fri Feb 8 11:51:00 2019 +0530
HIVE-21063 : Support statistics in cachedStore for transactional table.
(Mahesh Kumar Behera, reviewed by Sankar Hariappan, Daniel Dai)
---
.../hcatalog/listener/DbNotificationListener.java | 4 +-
.../TestCachedStoreUpdateUsingEvents.java | 714 ++++++++++++++++++---
.../hadoop/hive/ql/stats/StatsUpdaterThread.java | 4 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 25 +-
.../apache/hadoop/hive/metastore/ObjectStore.java | 10 +-
.../hadoop/hive/metastore/cache/CachedStore.java | 157 +++--
.../hadoop/hive/metastore/cache/SharedCache.java | 245 ++++++-
.../events/UpdatePartitionColumnStatEvent.java | 12 +-
.../events/UpdateTableColumnStatEvent.java | 13 +-
.../hive/metastore/messaging/MessageBuilder.java | 8 +-
.../UpdatePartitionColumnStatMessage.java | 2 -
.../messaging/UpdateTableColumnStatMessage.java | 2 -
.../json/JSONUpdatePartitionColumnStatMessage.java | 10 +-
.../json/JSONUpdateTableColumnStatMessage.java | 10 +-
.../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 2 +
.../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql | 3 +
.../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 2 +
.../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql | 3 +
.../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 2 +
.../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql | 3 +
.../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 2 +
.../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql | 3 +
.../sql/postgres/hive-schema-4.0.0.postgres.sql | 2 +
.../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql | 3 +
.../hadoop/hive/metastore/TestHiveMetaStore.java | 10 +-
25 files changed, 1004 insertions(+), 247 deletions(-)
diff --git
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 8404e3e..963b227 100644
---
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -759,7 +759,7 @@ public class DbNotificationListener extends
TransactionalMetaStoreEventListener
.buildUpdateTableColumnStatMessage(updateTableColumnStatEvent.getColStats(),
updateTableColumnStatEvent.getTableObj(),
updateTableColumnStatEvent.getTableParameters(),
- updateTableColumnStatEvent.getValidWriteIds(),
updateTableColumnStatEvent.getWriteId());
+ updateTableColumnStatEvent.getWriteId());
NotificationEvent event = new NotificationEvent(0, now(),
EventType.UPDATE_TABLE_COLUMN_STAT.toString(),
msgEncoder.getSerializer().serialize(msg));
ColumnStatisticsDesc statDesc =
updateTableColumnStatEvent.getColStats().getStatsDesc();
@@ -789,7 +789,7 @@ public class DbNotificationListener extends
TransactionalMetaStoreEventListener
updatePartColStatEvent.getPartVals(),
updatePartColStatEvent.getPartParameters(),
updatePartColStatEvent.getTableObj(),
- updatePartColStatEvent.getValidWriteIds(),
updatePartColStatEvent.getWriteId());
+ updatePartColStatEvent.getWriteId());
NotificationEvent event = new NotificationEvent(0, now(),
EventType.UPDATE_PARTITION_COLUMN_STAT.toString(),
msgEncoder.getSerializer().serialize(msg));
ColumnStatisticsDesc statDesc =
updatePartColStatEvent.getPartColStats().getStatsDesc();
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java
index 83f12a5..cdfc60c 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestCachedStoreUpdateUsingEvents.java
@@ -4,6 +4,8 @@ import java.util.*;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.*;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
@@ -12,7 +14,10 @@ import
org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.hcatalog.listener.DbNotificationListener;
import org.junit.Assert;
@@ -29,6 +34,7 @@ public class TestCachedStoreUpdateUsingEvents {
private SharedCache sharedCache;
private Configuration conf;
private HiveMetaStore.HMSHandler hmsHandler;
+ private String[] colType = new String[] {"double", "string"};
@Before
public void setUp() throws Exception {
@@ -39,11 +45,14 @@ public class TestCachedStoreUpdateUsingEvents {
MetastoreConf.setVar(conf, ConfVars.TRANSACTIONAL_EVENT_LISTENERS,
DbNotificationListener.class.getName());
MetastoreConf.setVar(conf, ConfVars.RAW_STORE_IMPL,
"org.apache.hadoop.hive.metastore.cache.CachedStore");
MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT,
true);
+ MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true);
+ MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED,
false);
MetaStoreTestUtils.setConfForStandloneMode(conf);
hmsHandler = new HiveMetaStore.HMSHandler("testCachedStore", conf, true);
- rawStore = hmsHandler.getMS();
+ rawStore = new ObjectStore();
+ rawStore.setConf(hmsHandler.getConf());
sharedCache = CachedStore.getSharedCache();
// Stop the CachedStore cache update service. We'll start it explicitly to
control the test
@@ -69,10 +78,14 @@ public class TestCachedStoreUpdateUsingEvents {
String serdeLocation = "file:/tmp";
Map<String, String> serdeParams = new HashMap<>();
SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>());
- StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation, "input",
"output", false, 0,
+ StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation,
+ null, null, false, 3,
serdeInfo, null, null, serdeParams);
+ sd.setInputFormat(OrcInputFormat.class.getName());
+ sd.setOutputFormat(OrcOutputFormat.class.getName());
sd.setStoredAsSubDirectories(false);
- Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols,
tblParams, null, null,
+ Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols,
tblParams,
+ null, null,
TableType.MANAGED_TABLE.toString());
tbl.setCatName(DEFAULT_CATALOG_NAME);
return tbl;
@@ -186,6 +199,10 @@ public class TestCachedStoreUpdateUsingEvents {
long lastEventId = -1;
RawStore rawStore = hmsHandler.getMS();
+ // Prewarm CachedStore
+ CachedStore.setCachePrewarmedState(false);
+ CachedStore.prewarm(rawStore);
+
// Add a db via rawStore
String dbName = "test_table_ops";
String dbOwner = "user1";
@@ -193,10 +210,6 @@ public class TestCachedStoreUpdateUsingEvents {
hmsHandler.create_database(db);
db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
- // Prewarm CachedStore
- CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
-
// Add a table via rawStore
String tblName = "tbl";
String tblOwner = "user1";
@@ -263,6 +276,10 @@ public class TestCachedStoreUpdateUsingEvents {
long lastEventId = -1;
RawStore rawStore = hmsHandler.getMS();
+ // Prewarm CachedStore
+ CachedStore.setCachePrewarmedState(false);
+ CachedStore.prewarm(rawStore);
+
// Add a db via rawStore
String dbName = "test_partition_ops";
String dbOwner = "user1";
@@ -285,21 +302,19 @@ public class TestCachedStoreUpdateUsingEvents {
hmsHandler.create_table(tbl);
tbl = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
- // Prewarm CachedStore
- CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
-
final String ptnColVal1 = "aaa";
Map<String, String> partParams = new HashMap<String, String>();
Partition ptn1 =
- new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0,
tbl.getSd(), partParams);
+ new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0,
+ 0, tbl.getSd(), partParams);
ptn1.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.add_partition(ptn1);
ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName,
Arrays.asList(ptnColVal1));
final String ptnColVal2 = "bbb";
Partition ptn2 =
- new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0,
tbl.getSd(), partParams);
+ new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0,
+ 0, tbl.getSd(), partParams);
ptn2.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.add_partition(ptn2);
ptn2 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName,
Arrays.asList(ptnColVal2));
@@ -307,17 +322,21 @@ public class TestCachedStoreUpdateUsingEvents {
// Read database, table, partition via CachedStore
Database dbRead =
sharedCache.getDatabaseFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
dbName.toLowerCase());
Assert.assertEquals(db, dbRead);
- Table tblRead =
sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
dbName.toLowerCase(), tblName.toLowerCase());
+ Table tblRead =
sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
+ dbName.toLowerCase(), tblName.toLowerCase());
compareTables(tbl, tblRead);
- Partition ptn1Read =
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal1));
+ Partition ptn1Read =
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
+ dbName.toLowerCase(), tblName.toLowerCase(),
Arrays.asList(ptnColVal1));
comparePartitions(ptn1, ptn1Read);
- Partition ptn2Read =
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
dbName.toLowerCase(), tblName.toLowerCase(), Arrays.asList(ptnColVal2));
+ Partition ptn2Read =
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME.toLowerCase(),
+ dbName.toLowerCase(), tblName.toLowerCase(),
Arrays.asList(ptnColVal2));
comparePartitions(ptn2, ptn2Read);
// Add a new partition via rawStore
final String ptnColVal3 = "ccc";
Partition ptn3 =
- new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0,
tbl.getSd(), partParams);
+ new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0,
+ 0, tbl.getSd(), partParams);
ptn3.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.add_partition(ptn3);
ptn3 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName,
Arrays.asList(ptnColVal3));
@@ -326,7 +345,8 @@ public class TestCachedStoreUpdateUsingEvents {
ptn1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName,
Arrays.asList(ptnColVal1));
final String ptnColVal1Alt = "aaa";
Partition ptn1Atl =
- new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0,
tbl.getSd(), partParams);
+ new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0,
+ 0, tbl.getSd(), partParams);
ptn1Atl.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.alter_partitions(dbName, tblName, Arrays.asList(ptn1Atl));
ptn1Atl = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName,
Arrays.asList(ptnColVal1Alt));
@@ -336,7 +356,8 @@ public class TestCachedStoreUpdateUsingEvents {
hmsHandler.drop_partition(dbName, tblName, Arrays.asList(ptnColVal2),
false);
// Read the newly added partition via CachedStore
- Partition ptnRead =
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
Arrays.asList(ptnColVal3));
+ Partition ptnRead =
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName,
+ tblName, Arrays.asList(ptnColVal3));
comparePartitions(ptn3, ptnRead);
// Read the altered partition via CachedStore
@@ -362,49 +383,116 @@ public class TestCachedStoreUpdateUsingEvents {
sharedCache.getSdCache().clear();
}
- @Test
- public void testTableColumnStatistics() throws Throwable {
- String dbName = "column_stats_test_db";
- String tblName = "tbl";
- String typeName = "person";
- String tblOwner = "testowner";
- int lastAccessed = 6796;
- String dbOwner = "user1";
+ private void updateTableColStats(String dbName, String tblName, String[]
colName,
+ double highValue, double avgColLen, boolean
isTxnTable) throws Throwable {
+ long writeId = -1;
+ String validWriteIds = null;
+ if (isTxnTable) {
+ writeId = allocateWriteIds(allocateTxns(1), dbName,
tblName).get(0).getWriteId();
+ validWriteIds = getValidWriteIds(dbName, tblName);
+ }
- // Add a db via rawStore
- Database db = createTestDb(dbName, dbOwner);
- hmsHandler.create_database(db);
- db = rawStore.getDatabase(DEFAULT_CATALOG_NAME, dbName);
+ ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+ statsDesc.setDbName(dbName);
+ statsDesc.setTableName(tblName);
+ statsDesc.setIsTblLevel(true);
+ statsDesc.setPartName(null);
- Map<String, String> tableParams = new HashMap<>();
- tableParams.put("test_param_1", "hi");
- tableParams.put("test_param_2", "50");
+ ColumnStatistics colStats = new ColumnStatistics();
+ colStats.setStatsDesc(statsDesc);
+ colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue,
avgColLen));
- // Add a table via rawStore
- List<FieldSchema> cols = new ArrayList<FieldSchema>();
- cols.add(new FieldSchema("income", "int", "integer column"));
- cols.add(new FieldSchema("name", "string", "string column"));
+ SetPartitionsStatsRequest setTblColStat = new
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+ setTblColStat.setWriteId(writeId);
+ setTblColStat.setValidWriteIdList(validWriteIds);
- List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
- ptnCols.add(new FieldSchema("ds", "string", "string partition column"));
- ptnCols.add(new FieldSchema("hr", "int", "integer partition column"));
+ // write stats objs persistently
+ hmsHandler.update_table_column_statistics_req(setTblColStat);
+ validateTablePara(dbName, tblName);
+
+ ColumnStatistics colStatsCache =
sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
+ dbName, tblName, Lists.newArrayList(colName[0]), validWriteIds,
true);
+ Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(),
colName[0]);
+ verifyStatDouble(colStatsCache.getStatsObj().get(0), colName[0],
highValue);
+
+ colStatsCache = sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
+ dbName, tblName, Lists.newArrayList(colName[1]), validWriteIds,
true);
+ Assert.assertEquals(colStatsCache.getStatsObj().get(0).getColName(),
colName[1]);
+ verifyStatString(colStatsCache.getStatsObj().get(0), colName[1],
avgColLen);
+ }
- Table tbl = createTestTblParam(dbName, tblName, tblOwner, cols, null,
tableParams);
- hmsHandler.create_table(tbl);
+ private void updatePartColStats(String dbName, String tblName, boolean
isTxnTable, String[] colName,
+ String partName, double highValue, double
avgColLen) throws Throwable {
+ long writeId = -1;
+ String validWriteIds = null;
+ List<Long> txnIds = null;
- // Prewarm CachedStore
- CachedStore.setCachePrewarmedState(false);
- CachedStore.prewarm(rawStore);
+ if (isTxnTable) {
+ txnIds = allocateTxns(1);
+ writeId = allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
+ validWriteIds = getValidWriteIds(dbName, tblName);
+ }
- // Create a ColumnStatistics Obj
- String[] colName = new String[]{"income", "name"};
+ ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+ statsDesc.setDbName(dbName);
+ statsDesc.setTableName(tblName);
+ statsDesc.setIsTblLevel(false);
+ statsDesc.setPartName(partName);
+
+ ColumnStatistics colStats = new ColumnStatistics();
+ colStats.setStatsDesc(statsDesc);
+ colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue,
avgColLen));
+
+ SetPartitionsStatsRequest setTblColStat = new
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+ setTblColStat.setWriteId(writeId);
+ setTblColStat.setValidWriteIdList(validWriteIds);
+
+ // write stats objs persistently
+ hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+ if (isTxnTable) {
+ CommitTxnRequest rqst = new CommitTxnRequest(txnIds.get(0));
+ hmsHandler.commit_txn(rqst);
+ writeId = allocateWriteIds(allocateTxns(1), dbName,
tblName).get(0).getWriteId();
+ validWriteIds = getValidWriteIds(dbName, tblName);
+ }
+
+ Deadline.startTimer("getPartitionColumnStatistics");
+ List<ColumnStatistics> statRowStore =
rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName,
+ Collections.singletonList(partName),
Collections.singletonList(colName[1]), validWriteIds);
+ Deadline.stopTimer();
+ verifyStatString(statRowStore.get(0).getStatsObj().get(0), colName[1],
avgColLen);
+ if (isTxnTable) {
+ Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), true);
+ } else {
+ Assert.assertEquals(statRowStore.get(0).isIsStatsCompliant(), false);
+ }
+
+ List<ColumnStatistics> statSharedCache =
sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
+ dbName, tblName, Collections.singletonList(partName),
Collections.singletonList(colName[1]),
+ validWriteIds, true);
+ verifyStatString(statSharedCache.get(0).getStatsObj().get(0), colName[1],
avgColLen);
+ if (isTxnTable) {
+ Assert.assertEquals(statSharedCache.get(0).isIsStatsCompliant(), true);
+ } else {
+ Assert.assertEquals(statSharedCache.get(0).isIsStatsCompliant(), false);
+ }
+
+ SharedCache.ColumStatsWithWriteId statPartCache =
sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME,
+ dbName, tblName, CachedStore.partNameToVals(partName), colName[0],
validWriteIds);
+ verifyStatDouble(statPartCache.getColumnStatisticsObj(), colName[0],
highValue);
+
+ statPartCache =
sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
+ CachedStore.partNameToVals(partName), colName[1], validWriteIds);
+ verifyStatString(statPartCache.getColumnStatisticsObj(), colName[1],
avgColLen);
+ }
+
+ private List<ColumnStatisticsObj> getStatsObjects(String dbName, String
tblName, String[] colName,
+ double highValue, double
avgColLen) throws Throwable {
double lowValue = 50000.21;
- double highValue = 1200000.4525;
long numNulls = 3;
long numDVs = 22;
- double avgColLen = 50.30;
long maxColLen = 102;
- String[] colType = new String[] {"double", "string"};
boolean isTblLevel = true;
String partName = null;
List<ColumnStatisticsObj> statsObjs = new ArrayList<>();
@@ -445,42 +533,96 @@ public class TestCachedStoreUpdateUsingEvents {
statsObj.setStatsData(statsData);
statsObjs.add(statsObj);
+ return statsObjs;
+ }
- ColumnStatistics colStats = new ColumnStatistics();
- colStats.setStatsDesc(statsDesc);
- colStats.setStatsObj(statsObjs);
+ private void verifyStatDouble(ColumnStatisticsObj colStats, String colName,
double highValue) {
+ double lowValue = 50000.21;
+ long numNulls = 3;
+ long numDVs = 22;
+ Assert.assertEquals(colStats.getColName(), colName);
+
Assert.assertEquals(colStats.getStatsData().getDoubleStats().getLowValue(),
lowValue, 0.01);
+
Assert.assertEquals(colStats.getStatsData().getDoubleStats().getHighValue(),
highValue, 0.01);
+
Assert.assertEquals(colStats.getStatsData().getDoubleStats().getNumNulls(),
numNulls);
+ Assert.assertEquals(colStats.getStatsData().getDoubleStats().getNumDVs(),
numDVs);
+ }
- // write stats objs persistently
- hmsHandler.update_table_column_statistics(colStats);
-
- ColumnStatisticsObj colStatsCache =
sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
- dbName, tblName, Lists.newArrayList(colName[0])).get(0);
- Assert.assertEquals(colStatsCache.getColName(), colName[0]);
-
Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getLowValue(),
lowValue, 0.01);
-
Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getHighValue(),
highValue, 0.01);
-
Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getNumNulls(),
numNulls);
-
Assert.assertEquals(colStatsCache.getStatsData().getDoubleStats().getNumDVs(),
numDVs);
-
- // test delete column stats; if no col name is passed all column stats
associated with the
- // table is deleted
- boolean status = hmsHandler.delete_table_column_statistics(dbName,
tblName, null);
- Assert.assertEquals(status, true);
+ private void verifyStatString(ColumnStatisticsObj colStats, String colName,
double avgColLen) {
+ long numNulls = 3;
+ long numDVs = 22;
+ long maxColLen = 102;
+ Assert.assertEquals(colStats.getColName(), colName);
+
Assert.assertEquals(colStats.getStatsData().getStringStats().getMaxColLen(),
maxColLen);
+
Assert.assertEquals(colStats.getStatsData().getStringStats().getAvgColLen(),
avgColLen, 0.01);
+
Assert.assertEquals(colStats.getStatsData().getStringStats().getNumNulls(),
numNulls);
+ Assert.assertEquals(colStats.getStatsData().getStringStats().getNumDVs(),
numDVs);
+ }
-
Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
- dbName, tblName, Lists.newArrayList(colName[0])).isEmpty(), true);
+ private void verifyStat(List<ColumnStatisticsObj> colStats, String[]
colName, double highValue, double avgColLen) {
+ //verifyStatDouble(colStats.get(0), colName[0], highValue);
+ verifyStatString(colStats.get(0), colName[1], avgColLen);
+ }
- tblName = "tbl_part";
- cols = new ArrayList<>();
- cols.add(new FieldSchema(colName[0], "int", null));
+ private void setUpBeforeTest(String dbName, String tblName, String[]
colName, boolean isTxnTable) throws Throwable {
+ String dbOwner = "user1";
+
+ // Prewarm CachedStore
+ CachedStore.setCachePrewarmedState(false);
+ CachedStore.prewarm(rawStore);
+
+ // Add a db via rawStore
+ Database db = createTestDb(dbName, dbOwner);
+ hmsHandler.create_database(db);
+ if (tblName != null) {
+ createTestTable(dbName, tblName, colName, isTxnTable);
+ }
+ }
+
+ private void createTestTable(String dbName, String tblName, String[]
colName, boolean isTxnTable) throws Throwable {
+ // Add a table via rawStore
+ List<FieldSchema> cols = new ArrayList<FieldSchema>();
+ cols.add(new FieldSchema(colName[0], "int", "integer column"));
+ cols.add(new FieldSchema(colName[1], "string", "string column"));
+
+ Map<String, String> tableParams = new HashMap<>();
+ tableParams.put("test_param_1", "hi");
+ tableParams.put("test_param_2", "50");
+ if (isTxnTable) {
+ tableParams.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ }
+
+ String tblOwner = "testowner";
+
+ List<FieldSchema> ptnCols = new ArrayList<FieldSchema>();
+ ptnCols.add(new FieldSchema("ds", "string", "string partition column"));
+ ptnCols.add(new FieldSchema("hr", "int", "integer partition column"));
+
+ Table tbl = createTestTblParam(dbName, tblName, tblOwner, cols, null,
tableParams);
+ hmsHandler.create_table(tbl);
+ }
+
+ private void createTableWithPart(String dbName, String tblName, String[]
colName, boolean isTxnTbl) throws Throwable {
+ List<FieldSchema> cols = new ArrayList<>();
+ cols.add(new FieldSchema(colName[0], colType[0], null));
List<FieldSchema> partCols = new ArrayList<>();
- partCols.add(new FieldSchema("col", "int", null));
+ partCols.add(new FieldSchema(colName[0], colType[0], null));
+ Map<String, String> tableParams = new HashMap<>();
+ tableParams.put("test_param_1", "hi");
+ tableParams.put("test_param_2", "50");
+ if (isTxnTbl) {
+ tableParams.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ StatsSetupConst.setBasicStatsState(tableParams, StatsSetupConst.TRUE);
+ }
StorageDescriptor sd =
- new StorageDescriptor(cols, null, "input", "output", false,
+ new StorageDescriptor(cols, null, "orc",
+ "orc", false,
0, new SerDeInfo("serde", "seriallib", new HashMap<>()),
- null, null, null);
+ null, null, tableParams);
+ sd.setInputFormat(OrcInputFormat.class.getName());
+ sd.setOutputFormat(OrcOutputFormat.class.getName());
- tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new
HashMap<>(),
- null, null, TableType.MANAGED_TABLE.toString());
+ Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd,
+ partCols, tableParams, null, null,
TableType.MANAGED_TABLE.toString());
tbl.setCatName(DEFAULT_CATALOG_NAME);
hmsHandler.create_table(tbl);
@@ -489,47 +631,405 @@ public class TestCachedStoreUpdateUsingEvents {
partVals1.add("1");
List<String> partVals2 = new ArrayList<>();
partVals2.add("2");
+ Map<String, String> partParams = new HashMap<>();
+ StatsSetupConst.setBasicStatsState(partParams, StatsSetupConst.TRUE);
+ EnvironmentContext environmentContext = new EnvironmentContext();
+ environmentContext.putToProperties(StatsSetupConst.STATS_GENERATED,
StatsSetupConst.TASK);
Partition ptn1 =
- new Partition(partVals1, dbName, tblName, 0, 0, sd, new
HashMap<>());
+ new Partition(partVals1, dbName, tblName, 0, 0, sd, partParams);
ptn1.setCatName(DEFAULT_CATALOG_NAME);
- hmsHandler.add_partition(ptn1);
+ hmsHandler.add_partition_with_environment_context(ptn1,
environmentContext);
Partition ptn2 =
- new Partition(partVals2, dbName, tblName, 0, 0, sd, new
HashMap<>());
+ new Partition(partVals2, dbName, tblName, 0, 0, sd, partParams);
ptn2.setCatName(DEFAULT_CATALOG_NAME);
- hmsHandler.add_partition(ptn2);
+ hmsHandler.add_partition_with_environment_context(ptn2,
environmentContext);
+ }
+
+ private List<Long> allocateTxns(int numTxns) throws Throwable {
+ OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "user", "host");
+ return hmsHandler.open_txns(openTxnRequest).getTxn_ids();
+ }
+
+ private List<TxnToWriteId> allocateWriteIds(List<Long> txnIds, String
dbName, String tblName) throws Throwable {
+ AllocateTableWriteIdsRequest allocateTableWriteIdsRequest = new
AllocateTableWriteIdsRequest(dbName, tblName);
+ allocateTableWriteIdsRequest.setTxnIds(txnIds);
+ return
hmsHandler.allocate_table_write_ids(allocateTableWriteIdsRequest).getTxnToWriteIds();
+ }
+
+ private String getValidWriteIds(String dbName, String tblName) throws
Throwable {
+ GetValidWriteIdsRequest validWriteIdsRequest = new GetValidWriteIdsRequest(
+ Collections.singletonList(TableName.getDbTable(dbName, tblName)));
+ GetValidWriteIdsResponse validWriteIdsResponse =
hmsHandler.get_valid_write_ids(validWriteIdsRequest);
+ return TxnCommonUtils.createValidReaderWriteIdList(validWriteIdsResponse.
+ getTblValidWriteIds().get(0)).writeToString();
+ }
+
+ private void validateTablePara(String dbName, String tblName) throws
Throwable {
+ Table tblRead = rawStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName);
+ Table tblRead1 = sharedCache.getTableFromCache(DEFAULT_CATALOG_NAME,
dbName, tblName);
+ Assert.assertEquals(tblRead.getParameters(), tblRead1.getParameters());
+ }
+ private void validatePartPara(String dbName, String tblName, String
partName) throws Throwable {
+ //Partition part1 = rawStore.getPartition(DEFAULT_CATALOG_NAME, dbName,
tblName, partName);
+ //Partition part2 =
sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
partName);
+ //Assert.assertEquals(part1.getParameters(), part2.getParameters());
+ }
+
+ private void deleteColStats(String dbName, String tblName, String[] colName)
throws Throwable {
+ boolean status = hmsHandler.delete_table_column_statistics(dbName,
tblName, null);
+ Assert.assertEquals(status, true);
+
Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
dbName, tblName,
+ Lists.newArrayList(colName[0]), null,
true).getStatsObj().isEmpty(), true);
+
Assert.assertEquals(sharedCache.getTableColStatsFromCache(DEFAULT_CATALOG_NAME,
dbName, tblName,
+ Lists.newArrayList(colName[1]), null,
true).getStatsObj().isEmpty(), true);
+ validateTablePara(dbName, tblName);
+ }
+
+ private void deletePartColStats(String dbName, String tblName, String[]
colName,
+ String partName) throws Throwable {
+ boolean status = hmsHandler.delete_partition_column_statistics(dbName,
tblName, partName, colName[1]);
+ Assert.assertEquals(status, true);
+
+ SharedCache.ColumStatsWithWriteId colStats =
sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName,
+ tblName, CachedStore.partNameToVals(partName), colName[1], null);
+ Assert.assertEquals(colStats.getColumnStatisticsObj(), null);
+ validateTablePara(dbName, tblName);
+ }
+
+ private void testTableColStatInternal(String dbName, String tblName, boolean
isTxnTable) throws Throwable {
+ String[] colName = new String[]{"income", "name"};
+ double highValue = 1200000.4525;
+ double avgColLen = 50.30;
+
+ setUpBeforeTest(dbName, tblName, colName, isTxnTable);
+ updateTableColStats(dbName, tblName, colName, highValue, avgColLen,
isTxnTable);
+ if (!isTxnTable) {
+ deleteColStats(dbName, tblName, colName);
+ }
+
+ tblName = "tbl_part";
+ createTableWithPart(dbName, tblName, colName, isTxnTable);
List<String> partitions = hmsHandler.get_partition_names(dbName, tblName,
(short)-1);
- partName = partitions.get(0);
- isTblLevel = false;
+ String partName = partitions.get(0);
+ updatePartColStats(dbName, tblName, isTxnTable, colName, partName,
highValue, avgColLen);
+ if (!isTxnTable) {
+ deletePartColStats(dbName, tblName, colName, partName);
+ }
+ }
+
+ @Test
+ public void testTableColumnStatistics() throws Throwable {
+ String dbName = "column_stats_test_db";
+ String tblName = "tbl";
+ testTableColStatInternal(dbName, tblName, false);
+ }
+
+ @Test
+ public void testTableColumnStatisticsTxnTable() throws Throwable {
+ String dbName = "column_stats_test_db_txn";
+ String tblName = "tbl_txn";
+ testTableColStatInternal(dbName, tblName, true);
+ }
+
+ @Test
+ public void testTableColumnStatisticsTxnTableMulti() throws Throwable {
+ String dbName = "column_stats_test_db_txn_multi";
+ String tblName = "tbl_part";
+ String[] colName = new String[]{"income", "name"};
+ double highValue = 1200000.4525;
+ double avgColLen = 50.30;
+
+ setUpBeforeTest(dbName, null, colName, true);
+ createTableWithPart(dbName, tblName, colName, true);
+ List<String> partitions = hmsHandler.get_partition_names(dbName, tblName,
(short)-1);
+ String partName = partitions.get(0);
+ updatePartColStats(dbName, tblName, true, colName, partName, highValue,
avgColLen);
+ updatePartColStats(dbName, tblName, true, colName, partName, 1200000.4521,
avgColLen);
+ updatePartColStats(dbName, tblName, true, colName, partName, highValue,
34.78);
+ }
+
+ @Test
+ public void testTableColumnStatisticsTxnTableMultiAbort() throws Throwable {
+ String dbName = "column_stats_test_db_txn_multi_abort";
+ String tblName = "tbl_part";
+ String[] colName = new String[]{"income", "name"};
+ double highValue = 1200000.4525;
+ double avgColLen = 50.30;
+
+ setUpBeforeTest(dbName, null, colName, true);
+ createTableWithPart(dbName, tblName, colName, true);
+ List<String> partitions = hmsHandler.get_partition_names(dbName, tblName,
(short)-1);
+ String partName = partitions.get(0);
+
+ List<Long> txnIds = allocateTxns(1);
+ long writeId = allocateWriteIds(txnIds, dbName,
tblName).get(0).getWriteId();
+ String validWriteIds = getValidWriteIds(dbName, tblName);
// create a new columnstatistics desc to represent partition level column
stats
- statsDesc = new ColumnStatisticsDesc();
+ ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
statsDesc.setDbName(dbName);
statsDesc.setTableName(tblName);
statsDesc.setPartName(partName);
- statsDesc.setIsTblLevel(isTblLevel);
+ statsDesc.setIsTblLevel(false);
+
+ ColumnStatistics colStats = new ColumnStatistics();
+ colStats.setStatsDesc(statsDesc);
+ colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue,
avgColLen));
+
+ SetPartitionsStatsRequest setTblColStat = new
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+ setTblColStat.setWriteId(writeId);
+ setTblColStat.setValidWriteIdList(validWriteIds);
+
+ // write stats objs persistently
+ hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+ // abort the txn and verify that the stats got is not compliant.
+ AbortTxnRequest rqst = new AbortTxnRequest(txnIds.get(0));
+ hmsHandler.abort_txn(rqst);
+
+ allocateWriteIds(allocateTxns(1), dbName, tblName);
+ validWriteIds = getValidWriteIds(dbName, tblName);
+
+ Deadline.startTimer("getPartitionColumnStatistics");
+ List<ColumnStatistics> statRawStore =
rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName,
+ Collections.singletonList(partName),
Collections.singletonList(colName[1]), validWriteIds);
+ Deadline.stopTimer();
+
+ verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue,
avgColLen);
+ Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false);
+
+ List<ColumnStatistics> statsListFromCache =
sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
+ dbName, tblName, Collections.singletonList(partName),
Collections.singletonList(colName[1]),
+ validWriteIds, true);
+ verifyStat(statsListFromCache.get(0).getStatsObj(), colName, highValue,
avgColLen);
+ Assert.assertEquals(statsListFromCache.get(0).isIsStatsCompliant(), false);
+
+ SharedCache.ColumStatsWithWriteId columStatsWithWriteId =
+ sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME,
dbName, tblName,
+ CachedStore.partNameToVals(partName), colName[1], validWriteIds);
+ Assert.assertEquals(columStatsWithWriteId, null);
+ validatePartPara(dbName, tblName, partName);
+ }
+
+ @Test
+ public void testTableColumnStatisticsTxnTableOpenTxn() throws Throwable {
+ String dbName = "column_stats_test_db_txn_multi_open";
+ String tblName = "tbl_part";
+ String[] colName = new String[]{"income", "name"};
+ double highValue = 1200000.4121;
+ double avgColLen = 23.30;
+
+ setUpBeforeTest(dbName, null, colName, true);
+ createTableWithPart(dbName, tblName, colName, true);
+ List<String> partitions = hmsHandler.get_partition_names(dbName, tblName,
(short)-1);
+ String partName = partitions.get(0);
+
+ // update part col stats successfully.
+ updatePartColStats(dbName, tblName, true, colName, partName, 1.2, 12.2);
+
+ List<Long> txnIds = allocateTxns(1);
+ long writeId = allocateWriteIds(txnIds, dbName,
tblName).get(0).getWriteId();
+ String validWriteIds = getValidWriteIds(dbName, tblName);
+
+ // create a new columnstatistics desc to represent partition level column
stats
+ ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+ statsDesc.setDbName(dbName);
+ statsDesc.setTableName(tblName);
+ statsDesc.setPartName(partName);
+ statsDesc.setIsTblLevel(false);
+
+ ColumnStatistics colStats = new ColumnStatistics();
+ colStats.setStatsDesc(statsDesc);
+ colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, highValue,
avgColLen));
+
+ SetPartitionsStatsRequest setTblColStat = new
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+ setTblColStat.setWriteId(writeId);
+ setTblColStat.setValidWriteIdList(validWriteIds);
+
+ // write stats objs persistently
+ hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+ // keep the txn open and verify that the stats got is not compliant.
+
+ allocateWriteIds(allocateTxns(1), dbName, tblName);
+ validWriteIds = getValidWriteIds(dbName, tblName);
+
+ Deadline.startTimer("getPartitionColumnStatistics");
+ List<ColumnStatistics> statRawStore =
rawStore.getPartitionColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName,
+ Collections.singletonList(partName),
Collections.singletonList(colName[1]), validWriteIds);
+ Deadline.stopTimer();
+
+ verifyStat(statRawStore.get(0).getStatsObj(), colName, highValue,
avgColLen);
+ Assert.assertEquals(statRawStore.get(0).isIsStatsCompliant(), false);
+
+ List<ColumnStatistics> statsListFromCache =
sharedCache.getPartitionColStatsListFromCache(DEFAULT_CATALOG_NAME,
+ dbName, tblName, Collections.singletonList(partName),
Collections.singletonList(colName[1]),
+ validWriteIds, true);
+ verifyStat(statsListFromCache.get(0).getStatsObj(), colName, highValue,
avgColLen);
+ Assert.assertEquals(statsListFromCache.get(0).isIsStatsCompliant(), false);
+
+ SharedCache.ColumStatsWithWriteId columStatsWithWriteId =
+ sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME,
dbName,
+ tblName, CachedStore.partNameToVals(partName), colName[1],
validWriteIds);
+ Assert.assertEquals(columStatsWithWriteId, null);
+ validatePartPara(dbName, tblName, partName);
+ }
+
+ private void verifyAggrStat(String dbName, String tblName, String[] colName,
List<String> partitions,
+ boolean isTxnTbl, double highValue) throws
Throwable {
+ List<Long> txnIds = allocateTxns(1);
+ allocateWriteIds(txnIds, dbName, tblName).get(0).getWriteId();
+ String validWriteIds = getValidWriteIds(dbName, tblName);
+
+ Deadline.startTimer("getPartitionSpecsByFilterAndProjection");
+ AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME,
dbName, tblName, partitions,
+ Collections.singletonList(colName[0]), validWriteIds);
+ Deadline.stopTimer();
+ Assert.assertEquals(aggrStats.getPartsFound(), 2);
+
Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getDoubleStats().getHighValue(),
highValue, 0.01);
+ //Assert.assertEquals(aggrStats.isIsStatsCompliant(), true);
+
+ // This will update the cache for non txn table.
+ PartitionsStatsRequest request = new PartitionsStatsRequest(dbName,
tblName,
+ Collections.singletonList(colName[0]), partitions);
+ request.setCatName(DEFAULT_CATALOG_NAME);
+ request.setValidWriteIdList(validWriteIds);
+ AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request);
+ Assert.assertEquals(aggrStatsCached, aggrStats);
+ //Assert.assertEquals(aggrStatsCached.isIsStatsCompliant(), true);
+
+ List<ColumnStatisticsObj> stats =
sharedCache.getAggrStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
+ Collections.singletonList(colName[0]), SharedCache.StatsType.ALL);
+
Assert.assertEquals(stats.get(0).getStatsData().getDoubleStats().getHighValue(),
highValue, 0.01);
+ }
+
+ @Test
+ public void testAggrStat() throws Throwable {
+ String dbName = "aggr_stats_test";
+ String tblName = "tbl_part";
+ String[] colName = new String[]{"income", "name"};
+
+ setUpBeforeTest(dbName, null, colName, false);
+ createTableWithPart(dbName, tblName, colName, false);
+ List<String> partitions = hmsHandler.get_partition_names(dbName, tblName,
(short) -1);
+ String partName = partitions.get(0);
+
+ // update part col stats successfully.
+ updatePartColStats(dbName, tblName, false, colName, partitions.get(0), 2,
12);
+ updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 4,
10);
+ verifyAggrStat(dbName, tblName, colName, partitions, false, 4);
+
+ updatePartColStats(dbName, tblName, false, colName, partitions.get(1), 3,
10);
+ verifyAggrStat(dbName, tblName, colName, partitions, false, 3);
+ }
+
+ @Test
+ public void testAggrStatTxnTable() throws Throwable {
+ String dbName = "aggr_stats_test_db_txn";
+ String tblName = "tbl_part";
+ String[] colName = new String[]{"income", "name"};
- colStats = new ColumnStatistics();
+ setUpBeforeTest(dbName, null, colName, true);
+ createTableWithPart(dbName, tblName, colName, true);
+ List<String> partitions = hmsHandler.get_partition_names(dbName, tblName,
(short)-1);
+ String partName = partitions.get(0);
+
+ // update part col stats successfully.
+ updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2,
12);
+ updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4,
10);
+ verifyAggrStat(dbName, tblName, colName, partitions, true, 4);
+
+ updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 3,
10);
+ verifyAggrStat(dbName, tblName, colName, partitions, true, 3);
+
+ List<Long> txnIds = allocateTxns(1);
+ long writeId = allocateWriteIds(txnIds, dbName,
tblName).get(0).getWriteId();
+ String validWriteIds = getValidWriteIds(dbName, tblName);
+
+ // create a new columnstatistics desc to represent partition level column
stats
+ ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+ statsDesc.setDbName(dbName);
+ statsDesc.setTableName(tblName);
+ statsDesc.setPartName(partName);
+ statsDesc.setIsTblLevel(false);
+
+ ColumnStatistics colStats = new ColumnStatistics();
+ colStats.setStatsDesc(statsDesc);
+ colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, 5, 20));
+
+ SetPartitionsStatsRequest setTblColStat = new
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+ setTblColStat.setWriteId(writeId);
+ setTblColStat.setValidWriteIdList(validWriteIds);
+ hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+ Deadline.startTimer("getPartitionSpecsByFilterAndProjection");
+ AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME,
dbName, tblName, partitions,
+ Collections.singletonList(colName[0]), validWriteIds);
+ Deadline.stopTimer();
+ Assert.assertEquals(aggrStats, null);
+
+ // keep the txn open and verify that the stats got is not compliant.
+ PartitionsStatsRequest request = new PartitionsStatsRequest(dbName,
tblName,
+ Collections.singletonList(colName[0]), partitions);
+ request.setCatName(DEFAULT_CATALOG_NAME);
+ request.setValidWriteIdList(validWriteIds);
+ AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request);
+ Assert.assertEquals(aggrStatsCached, null);
+ }
+
+ @Test
+ public void testAggrStatAbortTxn() throws Throwable {
+ String dbName = "aggr_stats_test_db_txn_abort";
+ String tblName = "tbl_part";
+ String[] colName = new String[]{"income", "name"};
+
+ setUpBeforeTest(dbName, null, colName, true);
+ createTableWithPart(dbName, tblName, colName, true);
+ List<String> partitions = hmsHandler.get_partition_names(dbName, tblName,
(short)-1);
+ String partName = partitions.get(0);
+
+ // update part col stats successfully.
+ updatePartColStats(dbName, tblName, true, colName, partitions.get(0), 2,
12);
+ updatePartColStats(dbName, tblName, true, colName, partitions.get(1), 4,
10);
+ verifyAggrStat(dbName, tblName, colName, partitions, true, 4);
+
+ List<Long> txnIds = allocateTxns(4);
+ long writeId = allocateWriteIds(txnIds, dbName,
tblName).get(0).getWriteId();
+ String validWriteIds = getValidWriteIds(dbName, tblName);
+
+ // create a new columnstatistics desc to represent partition level column
stats
+ ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc();
+ statsDesc.setDbName(dbName);
+ statsDesc.setTableName(tblName);
+ statsDesc.setPartName(partName);
+ statsDesc.setIsTblLevel(false);
+
+ ColumnStatistics colStats = new ColumnStatistics();
colStats.setStatsDesc(statsDesc);
- colStats.setStatsObj(statsObjs);
-
- hmsHandler.update_partition_column_statistics(colStats);
- ColumnStatisticsObj colStats2 =
sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
- CachedStore.partNameToVals(partName), colName[1]);
- // compare stats obj to ensure what we get is what we wrote
- Assert.assertEquals(colStats.getStatsDesc().getPartName(), partName);
- Assert.assertEquals(colStats2.getColName(), colName[1]);
-
Assert.assertEquals(colStats2.getStatsData().getStringStats().getMaxColLen(),
maxColLen);
-
Assert.assertEquals(colStats2.getStatsData().getStringStats().getAvgColLen(),
avgColLen, 0.01);
-
Assert.assertEquals(colStats2.getStatsData().getStringStats().getNumNulls(),
numNulls);
- Assert.assertEquals(colStats2.getStatsData().getStringStats().getNumDVs(),
numDVs);
-
- // test stats deletion at partition level
- hmsHandler.delete_partition_column_statistics(dbName, tblName, partName,
colName[1]);
-
- colStats2 =
sharedCache.getPartitionColStatsFromCache(DEFAULT_CATALOG_NAME, dbName, tblName,
- CachedStore.partNameToVals(partName), colName[1]);
- Assert.assertEquals(colStats2, null);
+ colStats.setStatsObj(getStatsObjects(dbName, tblName, colName, 5, 20));
+
+ SetPartitionsStatsRequest setTblColStat = new
SetPartitionsStatsRequest(Collections.singletonList(colStats));
+ setTblColStat.setWriteId(writeId);
+ setTblColStat.setValidWriteIdList(validWriteIds);
+ hmsHandler.update_partition_column_statistics_req(setTblColStat);
+
+ AbortTxnRequest abortTxnRequest = new AbortTxnRequest(txnIds.get(0));
+ hmsHandler.abort_txn(abortTxnRequest);
+
+ Deadline.startTimer("getPartitionSpecsByFilterAndProjection");
+ AggrStats aggrStats = rawStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME,
dbName, tblName, partitions,
+ Collections.singletonList(colName[0]), validWriteIds);
+ Deadline.stopTimer();
+ Assert.assertEquals(aggrStats, null);
+
+ // keep the txn open and verify that the stats got is not compliant.
+ PartitionsStatsRequest request = new PartitionsStatsRequest(dbName,
tblName,
+ Collections.singletonList(colName[0]), partitions);
+ request.setCatName(DEFAULT_CATALOG_NAME);
+ request.setValidWriteIdList(validWriteIds);
+ AggrStats aggrStatsCached = hmsHandler.get_aggr_stats_for(request);
+ Assert.assertEquals(aggrStatsCached, null);
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index c028e12..7c1944f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -448,7 +448,7 @@ public class StatsUpdaterThread extends Thread implements
MetaStoreThread {
}
// TODO: we should probably skip updating if writeId is from an active txn
boolean isTxnValid = (writeIdString == null) ||
ObjectStore.isCurrentStatsValidForTheQuery(
- conf, params, statsWriteId , writeIdString, false);
+ params, statsWriteId, writeIdString, false);
return getExistingStatsToUpdate(existingStats, params, isTxnValid);
}
@@ -473,7 +473,7 @@ public class StatsUpdaterThread extends Thread implements
MetaStoreThread {
}
// TODO: we should probably skip updating if writeId is from an active txn
if (writeIdString != null && !ObjectStore.isCurrentStatsValidForTheQuery(
- conf, params, statsWriteId, writeIdString, false)) {
+ params, statsWriteId, writeIdString, false)) {
return allCols;
}
List<String> colsToUpdate = new ArrayList<>();
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index b43fb5e..4472f99 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -3684,7 +3684,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
try {
boolean madeDir = createLocationForAddedPartition(table,
partition);
addedParts.put(new PartValEqWrapperLite(partition), madeDir);
- initializeAddedPartition(table, partition, madeDir);
+ initializeAddedPartition(table, partition, madeDir, null);
} catch (MetaException e) {
throw new IOException(e.getMessage(), e);
}
@@ -3973,15 +3973,18 @@ public class HiveMetaStore extends ThriftHiveMetastore {
return true;
}
- private void initializeAddedPartition(
- final Table tbl, final Partition part, boolean madeDir) throws
MetaException {
- initializeAddedPartition(tbl, new
PartitionSpecProxy.SimplePartitionWrapperIterator(part), madeDir);
+ private void initializeAddedPartition(final Table tbl, final Partition
part, boolean madeDir,
+ EnvironmentContext
environmentContext) throws MetaException {
+ initializeAddedPartition(tbl,
+ new PartitionSpecProxy.SimplePartitionWrapperIterator(part),
madeDir, environmentContext);
}
private void initializeAddedPartition(
- final Table tbl, final PartitionSpecProxy.PartitionIterator part,
boolean madeDir) throws MetaException {
+ final Table tbl, final PartitionSpecProxy.PartitionIterator part,
boolean madeDir,
+ EnvironmentContext environmentContext) throws MetaException {
if (canUpdateStats(tbl)) {
- MetaStoreServerUtils.updatePartitionStatsFast(part, tbl, wh, madeDir,
false, null, true);
+ MetaStoreServerUtils.updatePartitionStatsFast(part, tbl, wh, madeDir,
+ false, environmentContext, true);
}
// set create time
@@ -4046,7 +4049,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
assert shouldAdd; // start would throw if it already existed here
boolean madeDir = createLocationForAddedPartition(tbl, part);
try {
- initializeAddedPartition(tbl, part, madeDir);
+ initializeAddedPartition(tbl, part, madeDir, envContext);
initializePartitionParameters(tbl, part);
success = ms.addPartition(part);
} finally {
@@ -5989,13 +5992,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (transactionalListeners != null &&
!transactionalListeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventType.UPDATE_TABLE_COLUMN_STAT,
- new UpdateTableColumnStatEvent(colStats, tableObj,
parameters, validWriteIds,
+ new UpdateTableColumnStatEvent(colStats, tableObj,
parameters,
writeId, this));
}
if (!listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.UPDATE_TABLE_COLUMN_STAT,
- new UpdateTableColumnStatEvent(colStats, tableObj,
parameters, validWriteIds,
+ new UpdateTableColumnStatEvent(colStats, tableObj,
parameters,
writeId,this));
}
}
@@ -6055,13 +6058,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (transactionalListeners != null &&
!transactionalListeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventType.UPDATE_PARTITION_COLUMN_STAT,
- new UpdatePartitionColumnStatEvent(colStats, partVals,
parameters, tbl, validWriteIds,
+ new UpdatePartitionColumnStatEvent(colStats, partVals,
parameters, tbl,
writeId, this));
}
if (!listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.UPDATE_PARTITION_COLUMN_STAT,
- new UpdatePartitionColumnStatEvent(colStats, partVals,
parameters, tbl, validWriteIds,
+ new UpdatePartitionColumnStatEvent(colStats, partVals,
parameters, tbl,
writeId, this));
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 0485fe9..c0bae3b 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -8629,7 +8629,7 @@ public class ObjectStore implements RawStore,
Configurable {
// Make sure we set the flag to invalid regardless of the current
value.
StatsSetupConst.setBasicStatsState(newParams,
StatsSetupConst.FALSE);
LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the
partition "
- + statsDesc.getDbName() + "." + statsDesc.getTableName() + "."
+ statsDesc.getPartName());
+ + statsDesc.getDbName() + "." + statsDesc.getTableName() +
"." + statsDesc.getPartName());
}
mPartition.setWriteId(writeId);
}
@@ -12479,7 +12479,7 @@ public class ObjectStore implements RawStore,
Configurable {
*/
private boolean isCurrentStatsValidForTheQuery(MTable tbl, String
queryValidWriteIdList,
boolean isCompleteStatsWriter) throws MetaException {
- return isCurrentStatsValidForTheQuery(conf, tbl.getParameters(),
tbl.getWriteId(),
+ return isCurrentStatsValidForTheQuery(tbl.getParameters(),
tbl.getWriteId(),
queryValidWriteIdList, isCompleteStatsWriter);
}
@@ -12499,19 +12499,19 @@ public class ObjectStore implements RawStore,
Configurable {
private boolean isCurrentStatsValidForTheQuery(MPartition part,
String queryValidWriteIdList, boolean isCompleteStatsWriter)
throws MetaException {
- return isCurrentStatsValidForTheQuery(conf, part.getParameters(),
part.getWriteId(),
+ return isCurrentStatsValidForTheQuery(part.getParameters(),
part.getWriteId(),
queryValidWriteIdList, isCompleteStatsWriter);
}
private boolean isCurrentStatsValidForTheQuery(Partition part, long
partWriteId,
String queryValidWriteIdList, boolean isCompleteStatsWriter)
throws MetaException {
- return isCurrentStatsValidForTheQuery(conf, part.getParameters(),
partWriteId,
+ return isCurrentStatsValidForTheQuery(part.getParameters(), partWriteId,
queryValidWriteIdList, isCompleteStatsWriter);
}
// TODO: move to somewhere else
- public static boolean isCurrentStatsValidForTheQuery(Configuration conf,
+ public static boolean isCurrentStatsValidForTheQuery(
Map<String, String> statsParams, long statsWriteId, String
queryValidWriteIdList,
boolean isCompleteStatsWriter) throws MetaException {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 7ad4bd2..182d5cc 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.HiveAlterHandler;
+import org.apache.hadoop.hive.metastore.HiveMetaException;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType;
import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator;
@@ -205,7 +206,7 @@ public class CachedStore implements RawStore, Configurable {
return sharedCache;
}
- static private ColumnStatistics updateStatsForPart(RawStore rawStore, Table
before, String catalogName,
+ static private ColumnStatistics updateStatsForAlterPart(RawStore rawStore,
Table before, String catalogName,
String dbName, String tableName,
Partition part) throws Exception {
ColumnStatistics colStats;
List<String> deletedCols = new ArrayList<>();
@@ -215,32 +216,31 @@ public class CachedStore implements RawStore,
Configurable {
sharedCache.removePartitionColStatsFromCache(catalogName, dbName,
tableName, part.getValues(), column);
}
if (colStats != null) {
- sharedCache.updatePartitionColStatsInCache(catalogName, dbName,
tableName, part.getValues(), colStats.getStatsObj());
+ sharedCache.alterPartitionAndStatsInCache(catalogName, dbName,
tableName, part.getWriteId(),
+ part.getValues(), part.getParameters(), colStats.getStatsObj());
}
return colStats;
}
- static private void updateStatsForTable(RawStore rawStore, Table before,
Table after, String catalogName,
+ static private void updateStatsForAlterTable(RawStore rawStore, Table
tblBefore, Table tblAfter, String catalogName,
String dbName, String tableName)
throws Exception {
ColumnStatistics colStats = null;
List<String> deletedCols = new ArrayList<>();
- if (before.isSetPartitionKeys()) {
+ if (tblBefore.isSetPartitionKeys()) {
List<Partition> parts = sharedCache.listCachedPartitions(catalogName,
dbName, tableName, -1);
for (Partition part : parts) {
- colStats = updateStatsForPart(rawStore, before, catalogName, dbName,
tableName, part);
+ colStats = updateStatsForAlterPart(rawStore, tblBefore, catalogName,
dbName, tableName, part);
}
}
- boolean needUpdateAggrStat = false;
- List<ColumnStatisticsObj> statisticsObjs =
HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, before,
- after,null, null, rawStore.getConf(), deletedCols);
+ List<ColumnStatisticsObj> statisticsObjs =
HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, tblBefore,
+ tblAfter,null, null, rawStore.getConf(), deletedCols);
if (colStats != null) {
- sharedCache.updateTableColStatsInCache(catalogName, dbName, tableName,
statisticsObjs);
- needUpdateAggrStat = true;
+ sharedCache.alterTableAndStatsInCache(catalogName, dbName, tableName,
tblAfter.getWriteId(),
+ statisticsObjs, tblAfter.getParameters());
}
for (String column : deletedCols) {
sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName,
column);
- needUpdateAggrStat = true;
}
}
@@ -309,10 +309,8 @@ public class CachedStore implements RawStore, Configurable
{
sharedCache.alterPartitionInCache(catalogName, dbName, tableName,
alterPartitionMessage.getPtnObjBefore().getValues(),
alterPartitionMessage.getPtnObjAfter());
//TODO : Use the stat object stored in the alter table message to
update the stats in cache.
- if (updateStatsForPart(rawStore, alterPartitionMessage.getTableObj(),
- catalogName, dbName, tableName,
alterPartitionMessage.getPtnObjAfter()) != null) {
-
CacheUpdateMasterWork.updateTableAggregatePartitionColStats(rawStore,
catalogName, dbName, tableName);
- }
+ updateStatsForAlterPart(rawStore,
alterPartitionMessage.getTableObj(),
+ catalogName, dbName, tableName,
alterPartitionMessage.getPtnObjAfter());
break;
case MessageBuilder.DROP_PARTITION_EVENT:
DropPartitionMessage dropPartitionMessage =
deserializer.getDropPartitionMessage(message);
@@ -329,7 +327,7 @@ public class CachedStore implements RawStore, Configurable {
AlterTableMessage alterTableMessage =
deserializer.getAlterTableMessage(message);
sharedCache.alterTableInCache(catalogName, dbName, tableName,
alterTableMessage.getTableObjAfter());
//TODO : Use the stat object stored in the alter table message to
update the stats in cache.
- updateStatsForTable(rawStore, alterTableMessage.getTableObjBefore(),
alterTableMessage.getTableObjAfter(),
+ updateStatsForAlterTable(rawStore,
alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(),
catalogName, dbName, tableName);
break;
case MessageBuilder.DROP_TABLE_EVENT:
@@ -371,8 +369,8 @@ public class CachedStore implements RawStore, Configurable {
break;
case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT:
UpdateTableColumnStatMessage msg =
deserializer.getUpdateTableColumnStatMessage(message);
- updateTableColumnsStatsInternal(rawStore.getConf(),
msg.getColumnStatistics(), msg.getParameters(),
- msg.getValidWriteIds(), msg.getWriteId());
+ sharedCache.alterTableAndStatsInCache(catalogName, dbName,
tableName, msg.getWriteId(),
+ msg.getColumnStatistics().getStatsObj(),
msg.getParameters());
break;
case MessageBuilder.DELETE_TBL_COL_STAT_EVENT:
DeleteTableColumnStatMessage msgDel =
deserializer.getDeleteTableColumnStatMessage(message);
@@ -380,7 +378,8 @@ public class CachedStore implements RawStore, Configurable {
break;
case MessageBuilder.UPDATE_PART_COL_STAT_EVENT:
UpdatePartitionColumnStatMessage msgPartUpdate =
deserializer.getUpdatePartitionColumnStatMessage(message);
- sharedCache.updatePartitionColStatsInCache(catalogName, dbName,
tableName, msgPartUpdate.getPartVals(),
+ sharedCache.alterPartitionAndStatsInCache(catalogName, dbName,
tableName, msgPartUpdate.getWriteId(),
+ msgPartUpdate.getPartVals(), msgPartUpdate.getParameters(),
msgPartUpdate.getColumnStatistics().getStatsObj());
break;
case MessageBuilder.DELETE_PART_COL_STAT_EVENT:
@@ -909,7 +908,7 @@ public class CachedStore implements RawStore, Configurable {
sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
StringUtils.normalizeIdentifier(dbName),
StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions,
- aggrStatsAllButDefaultPartition);
+ aggrStatsAllButDefaultPartition, null);
}
} catch (MetaException | NoSuchObjectException e) {
LOG.info("Updating CachedStore: unable to read aggregate column stats
of table: " + tblName,
@@ -948,7 +947,12 @@ public class CachedStore implements RawStore, Configurable
{
// the event related to the current transactions are updated in the cache
and thus we can support strong
// consistency in case there is only one metastore.
if (canUseEvents) {
- triggerUpdateUsingEvent(rawStore);
+ try {
+ triggerUpdateUsingEvent(rawStore);
+ } catch (Exception e) {
+ //TODO : Not sure how to handle it as the commit is already done in
the object store.
+ LOG.error("Failed to update cache", e);
+ }
}
return true;
}
@@ -2000,8 +2004,7 @@ public class CachedStore implements RawStore,
Configurable {
Map<String, String> params, long statsWriteId, String validWriteIds)
throws MetaException {
if (!TxnUtils.isTransactionalTable(tableParams)) return params; // Not a
txn table.
if (areTxnStatsSupported && ((validWriteIds == null)
- || ObjectStore.isCurrentStatsValidForTheQuery(
- conf, params, statsWriteId, validWriteIds, false))) {
+ || ObjectStore.isCurrentStatsValidForTheQuery(params, statsWriteId,
validWriteIds, false))) {
// Valid stats are supported for txn tables, and either no verification
was requested by the
// caller, or the verification has succeeded.
return params;
@@ -2014,14 +2017,14 @@ public class CachedStore implements RawStore,
Configurable {
// Note: ideally this should be above both CachedStore and ObjectStore.
- private ColumnStatistics adjustColStatForGet(Map<String, String> tableParams,
- Map<String, String> params, ColumnStatistics colStat, long statsWriteId,
- String validWriteIds) throws MetaException {
+ public static ColumnStatistics adjustColStatForGet(Map<String, String>
tableParams,
+ ColumnStatistics colStat, long
statsWriteId,
+ String validWriteIds, boolean areTxnStatsSupported) throws MetaException
{
colStat.setIsStatsCompliant(true);
if (!TxnUtils.isTransactionalTable(tableParams)) return colStat; // Not a
txn table.
if (areTxnStatsSupported && ((validWriteIds == null)
|| ObjectStore.isCurrentStatsValidForTheQuery(
- conf, params, statsWriteId, validWriteIds, false))) {
+ tableParams, statsWriteId, validWriteIds, false))) {
// Valid stats are supported for txn tables, and either no verification
was requested by the
// caller, or the verification has succeeded.
return colStat;
@@ -2058,7 +2061,7 @@ public class CachedStore implements RawStore,
Configurable {
if (errorMsg != null) {
throw new MetaException(errorMsg);
}
- if (!ObjectStore.isCurrentStatsValidForTheQuery(conf, newParams,
table.getWriteId(),
+ if (!ObjectStore.isCurrentStatsValidForTheQuery(newParams,
table.getWriteId(),
validWriteIds, true)) {
// Make sure we set the flag to invalid regardless of the current
value.
StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
@@ -2111,11 +2114,14 @@ public class CachedStore implements RawStore,
Configurable {
return rawStore.getTableColumnStatistics(
catName, dbName, tblName, colNames, validWriteIds);
}
- ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName);
- List<ColumnStatisticsObj> colStatObjs =
- sharedCache.getTableColStatsFromCache(catName, dbName, tblName,
colNames);
- return adjustColStatForGet(table.getParameters(), table.getParameters(),
- new ColumnStatistics(csd, colStatObjs), table.getWriteId(),
validWriteIds);
+ ColumnStatistics columnStatistics =
+ sharedCache.getTableColStatsFromCache(catName, dbName, tblName,
colNames, validWriteIds, areTxnStatsSupported);
+ if (columnStatistics == null) {
+ LOG.info("Stat of Table {}.{} for column {} is not present in cache." +
+ "Getting from raw store", dbName, tblName, colNames);
+ return rawStore.getTableColumnStatistics(catName, dbName, tblName,
colNames, validWriteIds);
+ }
+ return columnStatistics;
}
@Override
@@ -2170,10 +2176,17 @@ public class CachedStore implements RawStore,
Configurable {
String catName, String dbName, String tblName, List<String> partNames,
List<String> colNames, String writeIdList)
throws MetaException, NoSuchObjectException {
- // TODO: why have updatePartitionColumnStatistics cache if this is a
bypass?
- // Note: when implemented, this needs to call adjustColStatForGet, like
other get methods.
- return rawStore.getPartitionColumnStatistics(
- catName, dbName, tblName, partNames, colNames, writeIdList);
+
+ // If writeIdList is not null, that means stats are requested within a txn
context. So set stats compliant to false,
+ // if areTxnStatsSupported is false or the write id which has updated the
stats in not compatible with writeIdList.
+ // This is done within table lock as the number of partitions may be more
than one and we need a consistent view
+ // for all the partitions.
+ List<ColumnStatistics> columnStatistics =
sharedCache.getPartitionColStatsListFromCache(catName, dbName, tblName,
+ partNames, colNames, writeIdList, areTxnStatsSupported);
+ if (columnStatistics == null) {
+ return rawStore.getPartitionColumnStatistics(catName, dbName, tblName,
partNames, colNames, writeIdList);
+ }
+ return columnStatistics;
}
@Override
@@ -2212,8 +2225,8 @@ public class CachedStore implements RawStore,
Configurable {
tblName = StringUtils.normalizeIdentifier(tblName);
// TODO: we currently cannot do transactional checks for stats here
// (incl. due to lack of sync w.r.t. the below rawStore call).
- //TODO : need to calculate aggregate locally in cached store
- if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null ||
canUseEvents) {
+ // In case the cache is updated using events, aggregate is calculated
locally and thus can be read from cache.
+ if (!shouldCacheTable(catName, dbName, tblName) || (writeIdList != null &&
!canUseEvents)) {
return rawStore.get_aggr_stats_for(
catName, dbName, tblName, partNames, colNames, writeIdList);
}
@@ -2225,45 +2238,68 @@ public class CachedStore implements RawStore,
Configurable {
}
List<String> allPartNames = rawStore.listPartitionNames(catName, dbName,
tblName, (short) -1);
+ StatsType type = StatsType.PARTIAL;
if (partNames.size() == allPartNames.size()) {
colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName,
colNames, StatsType.ALL);
if (colStats != null) {
return new AggrStats(colStats, partNames.size());
}
+ type = StatsType.ALL;
} else if (partNames.size() == (allPartNames.size() - 1)) {
String defaultPartitionName = MetastoreConf.getVar(getConf(),
ConfVars.DEFAULTPARTITIONNAME);
if (!partNames.contains(defaultPartitionName)) {
- colStats =
- sharedCache.getAggrStatsFromCache(catName, dbName, tblName,
colNames, StatsType.ALLBUTDEFAULT);
+ colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName,
colNames, StatsType.ALLBUTDEFAULT);
if (colStats != null) {
return new AggrStats(colStats, partNames.size());
}
+ type = StatsType.ALLBUTDEFAULT;
}
}
+
LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {},
parts= {}, cols= {}",
tblName, partNames, colNames);
- MergedColumnStatsForPartitions mergedColStats =
- mergeColStatsForPartitions(catName, dbName, tblName, partNames,
colNames, sharedCache);
+ MergedColumnStatsForPartitions mergedColStats =
mergeColStatsForPartitions(catName, dbName, tblName,
+ partNames, colNames, sharedCache, type, writeIdList);
+ if (mergedColStats == null) {
+ LOG.info("Aggregate stats of partition " +
TableName.getQualified(catName, dbName, tblName) + "." +
+ partNames + " for columns " + colNames + " is not present in
cache. Getting it from raw store");
+ return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames,
colNames, writeIdList);
+ }
return new AggrStats(mergedColStats.getColStats(),
mergedColStats.getPartsFound());
}
private MergedColumnStatsForPartitions mergeColStatsForPartitions(
String catName, String dbName, String tblName, List<String> partNames,
List<String> colNames,
- SharedCache sharedCache) throws MetaException {
+ SharedCache sharedCache, StatsType type, String writeIdList) throws
MetaException {
final boolean useDensityFunctionForNDVEstimation =
MetastoreConf.getBoolVar(getConf(),
ConfVars.STATS_NDV_DENSITY_FUNCTION);
final double ndvTuner = MetastoreConf.getDoubleVar(getConf(),
ConfVars.STATS_NDV_TUNER);
Map<ColumnStatsAggregator, List<ColStatsObjWithSourceInfo>> colStatsMap =
new HashMap<>();
- boolean areAllPartsFound = true;
- long partsFound = 0;
+ long partsFound = partNames.size();
+ Map<List<String>, Long> partNameToWriteId = writeIdList != null ? new
HashMap<>() : null;
for (String colName : colNames) {
long partsFoundForColumn = 0;
ColumnStatsAggregator colStatsAggregator = null;
List<ColStatsObjWithSourceInfo> colStatsWithPartInfoList = new
ArrayList<>();
for (String partName : partNames) {
- ColumnStatisticsObj colStatsForPart =
- sharedCache.getPartitionColStatsFromCache(catName, dbName,
tblName, partNameToVals(partName), colName);
- if (colStatsForPart != null) {
+ List<String> partValue = partNameToVals(partName);
+ // There are three possible result from getPartitionColStatsFromCache.
+ // 1. The partition has valid stats and thus colStatsWriteId returned
is valid non-null value
+ // 2. Partition stat is missing from cache and thus colStatsWriteId
returned is non-null but colstat
+ // info in it is null. In this case we just ignore the partition
from aggregate calculation to keep
+ // the behavior same as object store.
+ // 3. Partition is missing or its stat is updated by live(not yet
committed) or aborted txn. In this case,
+ // colStatsWriteId is null. Thus null is returned to keep the
behavior same as object store.
+ SharedCache.ColumStatsWithWriteId colStatsWriteId =
sharedCache.getPartitionColStatsFromCache(catName, dbName,
+ tblName, partValue, colName, writeIdList);
+ if (colStatsWriteId == null) {
+ return null;
+ }
+ if (colStatsWriteId.getColumnStatisticsObj() != null) {
+ ColumnStatisticsObj colStatsForPart =
colStatsWriteId.getColumnStatisticsObj();
+ if (partNameToWriteId != null) {
+ partNameToWriteId.put(partValue, colStatsWriteId.getWriteId());
+ }
ColStatsObjWithSourceInfo colStatsWithPartInfo =
new ColStatsObjWithSourceInfo(colStatsForPart, catName, dbName,
tblName, partName);
colStatsWithPartInfoList.add(colStatsWithPartInfo);
@@ -2282,7 +2318,9 @@ public class CachedStore implements RawStore,
Configurable {
if (colStatsWithPartInfoList.size() > 0) {
colStatsMap.put(colStatsAggregator, colStatsWithPartInfoList);
}
- if (partsFoundForColumn == partNames.size()) {
+ // set partsFound to the min(partsFoundForColumn) for all columns.
partsFound is the number of partitions, for
+ // which stats for all columns are present in the cache.
+ if (partsFoundForColumn < partsFound) {
partsFound = partsFoundForColumn;
}
if (colStatsMap.size() < 1) {
@@ -2293,8 +2331,23 @@ public class CachedStore implements RawStore,
Configurable {
}
// Note that enableBitVector does not apply here because
ColumnStatisticsObj
// itself will tell whether bitvector is null or not and aggr logic can
automatically apply.
- return new
MergedColumnStatsForPartitions(MetaStoreServerUtils.aggrPartitionStats(colStatsMap,
- partNames, areAllPartsFound, useDensityFunctionForNDVEstimation,
ndvTuner), partsFound);
+ List<ColumnStatisticsObj> colAggrStats =
MetaStoreServerUtils.aggrPartitionStats(colStatsMap,
+ partNames, partsFound == partNames.size(),
useDensityFunctionForNDVEstimation, ndvTuner);
+
+ if (canUseEvents) {
+ if (type == StatsType.ALL) {
+
sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
+ StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName), new
AggrStats(colAggrStats, partsFound),
+ null, partNameToWriteId);
+ } else if (type == StatsType.ALLBUTDEFAULT) {
+
sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName),
+ StringUtils.normalizeIdentifier(dbName),
+ StringUtils.normalizeIdentifier(tblName), null,
+ new AggrStats(colAggrStats, partsFound), partNameToWriteId);
+ }
+ }
+ return new MergedColumnStatsForPartitions(colAggrStats, partsFound);
}
class MergedColumnStatsForPartitions {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
index ce9e383..1c23022 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java
@@ -33,13 +33,18 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.TreeMap;
-import org.apache.hadoop.hive.metastore.StatObjectConverter;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.HiveMetaException;
+import org.apache.hadoop.hive.metastore.ObjectStore;
+import org.apache.hadoop.hive.metastore.StatObjectConverter;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.Catalog;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -47,6 +52,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator;
@@ -56,6 +62,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import static
org.apache.hadoop.hive.metastore.cache.CachedStore.partNameToVals;
import static
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
public class SharedCache {
@@ -85,7 +92,7 @@ public class SharedCache {
private static HashMap<Class<?>, ObjectEstimator> sizeEstimators = null;
enum StatsType {
- ALL(0), ALLBUTDEFAULT(1);
+ ALL(0), ALLBUTDEFAULT(1), PARTIAL(2);
private final int position;
@@ -249,6 +256,7 @@ public class SharedCache {
tableLock.readLock().lock();
PartitionWrapper wrapper =
partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
if (wrapper == null) {
+ LOG.debug("Partition: " + partVals + " is not present in the
cache.");
return null;
}
part = CacheUtils.assemble(wrapper, sharedCache);
@@ -342,6 +350,26 @@ public class SharedCache {
}
}
+ public void alterPartitionAndStats(List<String> partVals, SharedCache
sharedCache, long writeId,
+ Map<String,String> parameters,
List<ColumnStatisticsObj> colStatsObjs) {
+ try {
+ tableLock.writeLock().lock();
+ PartitionWrapper partitionWrapper =
partitionCache.get(CacheUtils.buildPartitionCacheKey(partVals));
+ if (partitionWrapper == null) {
+ LOG.info("Partition " + partVals + " is missing from cache. Cannot
update the partition stats in cache.");
+ return;
+ }
+ Partition newPart = partitionWrapper.getPartition();
+ newPart.setParameters(parameters);
+ newPart.setWriteId(writeId);
+ removePartition(partVals, sharedCache);
+ cachePartition(newPart, sharedCache);
+ updatePartitionColStats(partVals, colStatsObjs);
+ } finally {
+ tableLock.writeLock().unlock();
+ }
+ }
+
public void alterPartitions(List<List<String>> partValsList,
List<Partition> newParts,
SharedCache sharedCache) {
try {
@@ -445,7 +473,9 @@ public class SharedCache {
}
}
- public List<ColumnStatisticsObj> getCachedTableColStats(List<String>
colNames) {
+ public ColumnStatistics getCachedTableColStats(ColumnStatisticsDesc csd,
List<String> colNames,
+ String
validWriteIds, boolean areTxnStatsSupported)
+ throws MetaException {
List<ColumnStatisticsObj> colStatObjs = new
ArrayList<ColumnStatisticsObj>();
try {
tableLock.readLock().lock();
@@ -455,10 +485,11 @@ public class SharedCache {
colStatObjs.add(colStatObj);
}
}
+ return CachedStore.adjustColStatForGet(getTable().getParameters(), new
ColumnStatistics(csd, colStatObjs),
+ getTable().getWriteId(), validWriteIds, areTxnStatsSupported);
} finally {
tableLock.readLock().unlock();
}
- return colStatObjs;
}
public void removeTableColStats(String colName) {
@@ -485,16 +516,88 @@ public class SharedCache {
}
}
- public ColumnStatisticsObj getPartitionColStats(List<String> partVal,
String colName) {
+ public ColumStatsWithWriteId getPartitionColStats(List<String> partVal,
String colName, String writeIdList) {
try {
tableLock.readLock().lock();
- return partitionColStatsCache
- .get(CacheUtils.buildPartitonColStatsCacheKey(partVal, colName));
+ ColumnStatisticsObj statisticsObj =
+
partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal,
colName));
+ if (statisticsObj == null || writeIdList == null) {
+ return new ColumStatsWithWriteId(-1, statisticsObj);
+ }
+ PartitionWrapper wrapper =
partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal));
+ if (wrapper == null) {
+ LOG.info("Partition: " + partVal + " is not present in the cache.
Cannot update stats in cache.");
+ return null;
+ }
+ long writeId = wrapper.getPartition().getWriteId();
+ ValidWriteIdList list4TheQuery = new
ValidReaderWriteIdList(writeIdList);
+ // Just check if the write ID is valid. If it's valid (i.e. we are
allowed to see it),
+ // that means it cannot possibly be a concurrent write. If it's not
valid (we are not
+ // allowed to see it), that means it's either concurrent or aborted,
same thing for us.
+ if (!list4TheQuery.isWriteIdValid(writeId)) {
+ LOG.debug("Write id list " + writeIdList + " is not compatible with
write id " + writeId);
+ return null;
+ }
+ return new ColumStatsWithWriteId(writeId, statisticsObj);
} finally {
tableLock.readLock().unlock();
}
}
+ public List<ColumnStatistics> getPartColStatsList(List<String> partNames,
List<String> colNames,
+ String writeIdList, boolean
txnStatSupported) throws MetaException {
+ List<ColumnStatistics> colStatObjs = new ArrayList<>();
+ try {
+ tableLock.readLock().lock();
+ Table tbl = getTable();
+ for (String partName : partNames) {
+ ColumnStatisticsDesc csd = new ColumnStatisticsDesc(false,
+ tbl.getDbName(), tbl.getTableName());
+ csd.setCatName(tbl.getCatName());
+ csd.setPartName(partName);
+ csd.setLastAnalyzed(0); //TODO : Need to get last analysed. This is
not being used by anybody now.
+ List<ColumnStatisticsObj> statObject = new ArrayList<>();
+ List<String> partVal =
Warehouse.getPartValuesFromPartName(partName);
+ for (String colName : colNames) {
+ ColumnStatisticsObj statisticsObj =
+
partitionColStatsCache.get(CacheUtils.buildPartitonColStatsCacheKey(partVal,
colName));
+ if (statisticsObj != null) {
+ statObject.add(statisticsObj);
+ } else {
+ LOG.info("Stats not available in cachedStore for col " + colName
+ " in partition " + partVal);
+ return null;
+ }
+ }
+ ColumnStatistics columnStatistics = new ColumnStatistics(csd,
statObject);
+ if (writeIdList != null &&
TxnUtils.isTransactionalTable(getParameters())) {
+ columnStatistics.setIsStatsCompliant(true);
+ if (!txnStatSupported) {
+ columnStatistics.setIsStatsCompliant(false);
+ } else {
+ PartitionWrapper wrapper =
+
partitionCache.get(CacheUtils.buildPartitionCacheKey(partVal));
+ if (wrapper == null) {
+ columnStatistics.setIsStatsCompliant(false);
+ } else {
+ Partition partition = wrapper.getPartition();
+ if
(!ObjectStore.isCurrentStatsValidForTheQuery(partition.getParameters(),
+ partition.getWriteId(), writeIdList, false)) {
+ LOG.debug("The current cached store transactional partition
column statistics for {}.{}.{} "
+ + "(write ID {}) are not valid for current
query ({})", tbl.getDbName(),
+ tbl.getTableName(), partName,
partition.getWriteId(), writeIdList);
+ columnStatistics.setIsStatsCompliant(false);
+ }
+ }
+ }
+ }
+ colStatObjs.add(columnStatistics);
+ }
+ } finally {
+ tableLock.readLock().unlock();
+ }
+ return colStatObjs;
+ }
+
public boolean updatePartitionColStats(List<String> partVal,
List<ColumnStatisticsObj> colStatsObjs) {
try {
@@ -661,11 +764,30 @@ public class SharedCache {
}
public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions,
- AggrStats aggrStatsAllButDefaultPartition) {
+ AggrStats aggrStatsAllButDefaultPartition, SharedCache sharedCache,
Map<List<String>, Long> partNameToWriteId) {
Map<String, List<ColumnStatisticsObj>> newAggrColStatsCache =
new HashMap<String, List<ColumnStatisticsObj>>();
try {
tableLock.writeLock().lock();
+ if (partNameToWriteId != null) {
+ for (Entry<List<String>, Long> partValuesWriteIdSet :
partNameToWriteId.entrySet()) {
+ List<String> partValues = partValuesWriteIdSet.getKey();
+ Partition partition = getPartition(partValues, sharedCache);
+ if (partition == null) {
+ LOG.info("Could not refresh the aggregate stat as partition " +
partValues + " does not exist");
+ return;
+ }
+
+ // for txn tables, if the write id is modified means the partition
is updated post fetching of stats. So
+ // skip updating the aggregate stats in the cache.
+ long writeId = partition.getWriteId();
+ if (writeId != partValuesWriteIdSet.getValue()) {
+ LOG.info("Could not refresh the aggregate stat as partition " +
partValues + " has write id " +
+ partValuesWriteIdSet.getValue() + " instead of " +
writeId);
+ return;
+ }
+ }
+ }
if (aggrStatsAllPartitions != null) {
for (ColumnStatisticsObj statObj :
aggrStatsAllPartitions.getColStats()) {
if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) {
@@ -794,6 +916,23 @@ public class SharedCache {
}
}
+ public static class ColumStatsWithWriteId {
+ private long writeId;
+ private ColumnStatisticsObj columnStatisticsObj;
+ public ColumStatsWithWriteId(long writeId, ColumnStatisticsObj
columnStatisticsObj) {
+ this.writeId = writeId;
+ this.columnStatisticsObj = columnStatisticsObj;
+ }
+
+ public long getWriteId() {
+ return writeId;
+ }
+
+ public ColumnStatisticsObj getColumnStatisticsObj() {
+ return columnStatisticsObj;
+ }
+ }
+
public void populateCatalogsInCache(Collection<Catalog> catalogs) {
for (Catalog cat : catalogs) {
Catalog catCopy = cat.deepCopy();
@@ -1205,6 +1344,30 @@ public class SharedCache {
}
}
+ public void alterTableAndStatsInCache(String catName, String dbName, String
tblName, long writeId,
+ List<ColumnStatisticsObj>
colStatsObjs, Map<String,String> newParams) {
+ try {
+ cacheLock.writeLock().lock();
+ TableWrapper tblWrapper =
+ tableCache.remove(CacheUtils.buildTableKey(catName, dbName,
tblName));
+ if (tblWrapper == null) {
+ LOG.info("Table " + tblName + " is missing from cache. Cannot update
table stats in cache");
+ return;
+ }
+ Table newTable = tblWrapper.getTable();
+ newTable.setWriteId(writeId);
+ newTable.setParameters(newParams);
+ //tblWrapper.updateTableObj(newTable, this);
+ String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName());
+ String newTblName =
StringUtils.normalizeIdentifier(newTable.getTableName());
+ tableCache.put(CacheUtils.buildTableKey(catName, newDbName, newTblName),
tblWrapper);
+ tblWrapper.updateTableColStats(colStatsObjs);
+ isTableCacheDirty.set(true);
+ } finally {
+ cacheLock.writeLock().unlock();
+ }
+ }
+
public List<Table> listCachedTables(String catName, String dbName) {
List<Table> tables = new ArrayList<>();
try {
@@ -1299,19 +1462,20 @@ public class SharedCache {
}
}
- public List<ColumnStatisticsObj> getTableColStatsFromCache(String catName,
String dbName,
- String tblName, List<String> colNames) {
- List<ColumnStatisticsObj> colStatObjs = new ArrayList<>();
+ public ColumnStatistics getTableColStatsFromCache(String catName, String
dbName,
+ String tblName, List<String> colNames, String validWriteIds, boolean
areTxnStatsSupported) throws MetaException {
try {
cacheLock.readLock().lock();
TableWrapper tblWrapper =
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
- if (tblWrapper != null) {
- colStatObjs = tblWrapper.getCachedTableColStats(colNames);
+ if (tblWrapper == null) {
+ LOG.info("Table " + tblName + " is missing from cache.");
+ return null;
}
+ ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName,
tblName);
+ return tblWrapper.getCachedTableColStats(csd, colNames, validWriteIds,
areTxnStatsSupported);
} finally {
cacheLock.readLock().unlock();
}
- return colStatObjs;
}
public void removeTableColStatsFromCache(String catName, String dbName,
String tblName,
@@ -1321,6 +1485,8 @@ public class SharedCache {
TableWrapper tblWrapper =
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.removeTableColStats(colName);
+ } else {
+ LOG.info("Table " + tblName + " is missing from cache.");
}
} finally {
cacheLock.readLock().unlock();
@@ -1333,6 +1499,8 @@ public class SharedCache {
TableWrapper tblWrapper =
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.removeAllTableColStats();
+ } else {
+ LOG.info("Table " + tblName + " is missing from cache.");
}
} finally {
cacheLock.readLock().unlock();
@@ -1347,6 +1515,8 @@ public class SharedCache {
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
if (tblWrapper != null) {
tblWrapper.updateTableColStats(colStatsForTable);
+ } else {
+ LOG.info("Table " + tableName + " is missing from cache.");
}
} finally {
cacheLock.readLock().unlock();
@@ -1361,6 +1531,8 @@ public class SharedCache {
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName));
if (tblWrapper != null) {
tblWrapper.refreshTableColStats(colStatsForTable);
+ } else {
+ LOG.info("Table " + tableName + " is missing from cache.");
}
} finally {
cacheLock.readLock().unlock();
@@ -1513,6 +1685,20 @@ public class SharedCache {
}
}
+ public void alterPartitionAndStatsInCache(String catName, String dbName,
String tblName, long writeId,
+ List<String> partVals,
Map<String,String> parameters,
+ List<ColumnStatisticsObj>
colStatsObjs) {
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper =
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ if (tblWrapper != null) {
+ tblWrapper.alterPartitionAndStats(partVals, this, writeId, parameters,
colStatsObjs);
+ }
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ }
+
public void alterPartitionsInCache(String catName, String dbName, String
tblName,
List<List<String>> partValsList, List<Partition> newParts) {
try {
@@ -1578,14 +1764,14 @@ public class SharedCache {
}
}
- public ColumnStatisticsObj getPartitionColStatsFromCache(String catName,
String dbName,
- String tblName, List<String> partVal, String colName) {
- ColumnStatisticsObj colStatObj = null;
+ public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName,
String dbName,
+ String tblName, List<String> partVal, String colName, String
writeIdList) {
+ ColumStatsWithWriteId colStatObj = null;
try {
cacheLock.readLock().lock();
TableWrapper tblWrapper =
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
- colStatObj = tblWrapper.getPartitionColStats(partVal, colName);
+ colStatObj = tblWrapper.getPartitionColStats(partVal, colName,
writeIdList);
}
} finally {
cacheLock.readLock().unlock();
@@ -1593,6 +1779,24 @@ public class SharedCache {
return colStatObj;
}
+ public List<ColumnStatistics> getPartitionColStatsListFromCache(String
catName, String dbName, String tblName,
+ List<String>
partNames, List<String> colNames,
+ String
writeIdList, boolean txnStatSupported) {
+ List<ColumnStatistics> colStatObjs = null;
+ try {
+ cacheLock.readLock().lock();
+ TableWrapper tblWrapper =
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
+ if (tblWrapper != null) {
+ colStatObjs = tblWrapper.getPartColStatsList(partNames, colNames,
writeIdList, txnStatSupported);
+ }
+ } catch (MetaException e) {
+ LOG.warn("Failed to get partition column statistics");
+ } finally {
+ cacheLock.readLock().unlock();
+ }
+ return colStatObjs;
+ }
+
public void refreshPartitionColStatsInCache(String catName, String dbName,
String tblName,
List<ColumnStatistics> partitionColStats) {
try {
@@ -1635,13 +1839,14 @@ public class SharedCache {
}
public void refreshAggregateStatsInCache(String catName, String dbName,
String tblName,
- AggrStats aggrStatsAllPartitions, AggrStats
aggrStatsAllButDefaultPartition) {
+ AggrStats aggrStatsAllPartitions, AggrStats
aggrStatsAllButDefaultPartition,
+ Map<List<String>, Long>
partNameToWriteId) {
try {
cacheLock.readLock().lock();
TableWrapper tblWrapper =
tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName));
if (tblWrapper != null) {
tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions,
- aggrStatsAllButDefaultPartition);
+ aggrStatsAllButDefaultPartition, this, partNameToWriteId);
}
} finally {
cacheLock.readLock().unlock();
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java
index 094f799..ba61a08 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdatePartitionColumnStatEvent.java
@@ -35,7 +35,6 @@ import java.util.Map;
@InterfaceStability.Stable
public class UpdatePartitionColumnStatEvent extends ListenerEvent {
private ColumnStatistics partColStats;
- private String validWriteIds;
private long writeId;
private Map<String, String> parameters;
private List<String> partVals;
@@ -45,16 +44,14 @@ public class UpdatePartitionColumnStatEvent extends
ListenerEvent {
* @param statsObj Columns statistics Info.
* @param partVals partition names
* @param parameters table parameters to be updated after stats are updated.
- * @param validWriteIds valid write id list for the query.
+ * @param tableObj table object
* @param writeId writeId for the query.
* @param handler handler that is firing the event
*/
public UpdatePartitionColumnStatEvent(ColumnStatistics statsObj,
List<String> partVals, Map<String, String> parameters,
- Table tableObj, String validWriteIds,
long writeId,
- IHMSHandler handler) {
+ Table tableObj, long writeId,
IHMSHandler handler) {
super(true, handler);
this.partColStats = statsObj;
- this.validWriteIds = validWriteIds;
this.writeId = writeId;
this.parameters = parameters;
this.partVals = partVals;
@@ -71,7 +68,6 @@ public class UpdatePartitionColumnStatEvent extends
ListenerEvent {
super(true, handler);
this.partColStats = statsObj;
this.partVals = partVals;
- this.validWriteIds = null;
this.writeId = 0;
this.parameters = null;
this.tableObj = tableObj;
@@ -81,10 +77,6 @@ public class UpdatePartitionColumnStatEvent extends
ListenerEvent {
return partColStats;
}
- public String getValidWriteIds() {
- return validWriteIds;
- }
-
public long getWriteId() {
return writeId;
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java
index 3f988bb..71300ab 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/UpdateTableColumnStatEvent.java
@@ -35,24 +35,22 @@ import java.util.Map;
@InterfaceStability.Stable
public class UpdateTableColumnStatEvent extends ListenerEvent {
private ColumnStatistics colStats;
- private String validWriteIds;
private long writeId;
private Map<String, String> parameters;
private Table tableObj;
/**
* @param colStats Columns statistics Info.
+ * @param tableObj table object
* @param parameters table parameters to be updated after stats are updated.
- * @param validWriteIds valid write id list for the query.
- * @param colStats writeId for the query.
+ * @param writeId writeId for the query.
* @param handler handler that is firing the event
*/
public UpdateTableColumnStatEvent(ColumnStatistics colStats, Table tableObj,
- Map<String, String> parameters, String
validWriteIds,
+ Map<String, String> parameters,
long writeId, IHMSHandler handler) {
super(true, handler);
this.colStats = colStats;
- this.validWriteIds = validWriteIds;
this.writeId = writeId;
this.parameters = parameters;
this.tableObj = tableObj;
@@ -65,7 +63,6 @@ public class UpdateTableColumnStatEvent extends ListenerEvent
{
public UpdateTableColumnStatEvent(ColumnStatistics colStats, IHMSHandler
handler) {
super(true, handler);
this.colStats = colStats;
- this.validWriteIds = null;
this.writeId = 0;
this.parameters = null;
this.tableObj = null;
@@ -75,10 +72,6 @@ public class UpdateTableColumnStatEvent extends
ListenerEvent {
return colStats;
}
- public String getValidWriteIds() {
- return validWriteIds;
- }
-
public long getWriteId() {
return writeId;
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
index 10c6b44..15c4769 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageBuilder.java
@@ -289,9 +289,9 @@ public class MessageBuilder {
public JSONUpdateTableColumnStatMessage
buildUpdateTableColumnStatMessage(ColumnStatistics colStats,
Table tableObj,
Map<String, String> parameters,
-
String validWriteIds, long writeId) {
+
long writeId) {
return new JSONUpdateTableColumnStatMessage(MS_SERVER_URL,
MS_SERVICE_PRINCIPAL, now(),
- colStats, tableObj, parameters, validWriteIds, writeId);
+ colStats, tableObj, parameters, writeId);
}
public JSONDeleteTableColumnStatMessage
buildDeleteTableColumnStatMessage(String dbName, String colName) {
@@ -300,9 +300,9 @@ public class MessageBuilder {
public JSONUpdatePartitionColumnStatMessage
buildUpdatePartitionColumnStatMessage(ColumnStatistics colStats,
List<String>
partVals, Map<String, String> parameters,
- Table tableObj,
String validWriteIds, long writeId) {
+ Table tableObj,
long writeId) {
return new JSONUpdatePartitionColumnStatMessage(MS_SERVER_URL,
MS_SERVICE_PRINCIPAL, now(), colStats, partVals,
- parameters, tableObj, validWriteIds, writeId);
+ parameters, tableObj, writeId);
}
public JSONDeletePartitionColumnStatMessage
buildDeletePartitionColumnStatMessage(String dbName, String colName,
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java
index 7eb6c07..e92a0dc 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdatePartitionColumnStatMessage.java
@@ -34,8 +34,6 @@ public abstract class UpdatePartitionColumnStatMessage
extends EventMessage {
public abstract ColumnStatistics getColumnStatistics();
- public abstract String getValidWriteIds();
-
public abstract Long getWriteId();
public abstract Map<String, String> getParameters();
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java
index 7919b0e..e3f049c 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/UpdateTableColumnStatMessage.java
@@ -33,8 +33,6 @@ public abstract class UpdateTableColumnStatMessage extends
EventMessage {
public abstract ColumnStatistics getColumnStatistics();
- public abstract String getValidWriteIds();
-
public abstract Long getWriteId();
public abstract Map<String, String> getParameters();
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java
index 1b35df5..fd7fe00 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdatePartitionColumnStatMessage.java
@@ -38,7 +38,7 @@ public class JSONUpdatePartitionColumnStatMessage extends
UpdatePartitionColumnS
private Long writeId, timestamp;
@JsonProperty
- private String validWriteIds, server, servicePrincipal, database;
+ private String server, servicePrincipal, database;
@JsonProperty
private String colStatsJson;
@@ -61,12 +61,11 @@ public class JSONUpdatePartitionColumnStatMessage extends
UpdatePartitionColumnS
public JSONUpdatePartitionColumnStatMessage(String server, String
servicePrincipal, Long timestamp,
ColumnStatistics colStats,
List<String> partVals,
Map<String, String> parameters,
- Table tableObj, String
validWriteIds, long writeId) {
+ Table tableObj, long writeId) {
this.timestamp = timestamp;
this.server = server;
this.servicePrincipal = servicePrincipal;
this.writeId = writeId;
- this.validWriteIds = validWriteIds;
this.database = colStats.getStatsDesc().getDbName();
this.partVals = partVals;
try {
@@ -108,11 +107,6 @@ public class JSONUpdatePartitionColumnStatMessage extends
UpdatePartitionColumnS
}
@Override
- public String getValidWriteIds() {
- return validWriteIds;
- }
-
- @Override
public Long getWriteId() {
return writeId;
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java
index c932b7c..275d204 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONUpdateTableColumnStatMessage.java
@@ -36,7 +36,7 @@ public class JSONUpdateTableColumnStatMessage extends
UpdateTableColumnStatMessa
private Long writeId, timestamp;
@JsonProperty
- private String validWriteIds, server, servicePrincipal, database;
+ private String server, servicePrincipal, database;
@JsonProperty
private String colStatsJson;
@@ -55,12 +55,11 @@ public class JSONUpdateTableColumnStatMessage extends
UpdateTableColumnStatMessa
public JSONUpdateTableColumnStatMessage(String server, String
servicePrincipal, Long timestamp,
ColumnStatistics colStats, Table tableObj, Map<String,
String> parameters,
- String validWriteIds, long writeId) {
+ long writeId) {
this.timestamp = timestamp;
this.server = server;
this.servicePrincipal = servicePrincipal;
this.writeId = writeId;
- this.validWriteIds = validWriteIds;
this.database = colStats.getStatsDesc().getDbName();
try {
this.colStatsJson = MessageBuilder.createTableColumnStatJson(colStats);
@@ -106,11 +105,6 @@ public class JSONUpdateTableColumnStatMessage extends
UpdateTableColumnStatMessa
}
@Override
- public String getValidWriteIds() {
- return validWriteIds;
- }
-
- @Override
public Long getWriteId() {
return writeId;
}
diff --git
a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 2f102a2..02ff4ae 100644
---
a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -177,6 +177,8 @@ CREATE TABLE "APP"."NOTIFICATION_LOG" (
"MESSAGE_FORMAT" VARCHAR(16)
);
+CREATE UNIQUE INDEX "APP"."NOTIFICATION_LOG_EVENT_ID" ON
"APP"."NOTIFICATION_LOG" ("EVENT_ID");
+
CREATE TABLE "APP"."NOTIFICATION_SEQUENCE" ("NNI_ID" BIGINT NOT NULL,
"NEXT_EVENT_ID" BIGINT NOT NULL);
CREATE TABLE "APP"."KEY_CONSTRAINTS" ("CHILD_CD_ID" BIGINT,
"CHILD_INTEGER_IDX" INTEGER, "CHILD_TBL_ID" BIGINT, "PARENT_CD_ID" BIGINT ,
"PARENT_INTEGER_IDX" INTEGER, "PARENT_TBL_ID" BIGINT NOT NULL, "POSITION"
BIGINT NOT NULL, "CONSTRAINT_NAME" VARCHAR(400) NOT NULL, "CONSTRAINT_TYPE"
SMALLINT NOT NULL, "UPDATE_RULE" SMALLINT, "DELETE_RULE" SMALLINT,
"ENABLE_VALIDATE_RELY" SMALLINT NOT NULL, "DEFAULT_VALUE" VARCHAR(400));
diff --git
a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index 8ad56fc..1a1e34a 100644
---
a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -9,6 +9,9 @@ UPDATE "APP"."WM_RESOURCEPLAN" SET NS = 'default' WHERE NS IS
NULL;
DROP INDEX "APP"."UNIQUE_WM_RESOURCEPLAN";
CREATE UNIQUE INDEX "APP"."UNIQUE_WM_RESOURCEPLAN" ON "APP"."WM_RESOURCEPLAN"
("NS", "NAME");
+-- HIVE-21063
+CREATE UNIQUE INDEX "APP"."NOTIFICATION_LOG_EVENT_ID" ON
"APP"."NOTIFICATION_LOG" ("EVENT_ID");
+
-- This needs to be the last thing done. Insert any changes above this line.
UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release
version 4.0.0' where VER_ID=1;
diff --git
a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 383d3bc..4f58343 100644
---
a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -629,6 +629,8 @@ CREATE TABLE NOTIFICATION_LOG
ALTER TABLE NOTIFICATION_LOG ADD CONSTRAINT NOTIFICATION_LOG_PK PRIMARY KEY
(NL_ID);
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG (EVENT_ID);
+
CREATE TABLE NOTIFICATION_SEQUENCE
(
NNI_ID bigint NOT NULL,
diff --git
a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index edde08d..e0d143a 100644
---
a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -10,6 +10,9 @@ UPDATE WM_RESOURCEPLAN SET NS = 'default' WHERE NS IS NULL;
DROP INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN;
CREATE UNIQUE INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN ("NS", "NAME");
+-- HIVE-21063
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG (EVENT_ID);
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release
version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git
a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 5466537..8db11d3 100644
---
a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -871,6 +871,8 @@ CREATE TABLE IF NOT EXISTS `NOTIFICATION_LOG`
PRIMARY KEY (`NL_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+CREATE UNIQUE INDEX `NOTIFICATION_LOG_EVENT_ID` ON NOTIFICATION_LOG
(`EVENT_ID`) USING BTREE;
+
CREATE TABLE IF NOT EXISTS `NOTIFICATION_SEQUENCE`
(
`NNI_ID` BIGINT(20) NOT NULL,
diff --git
a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 701acb0..47c3831 100644
---
a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -11,6 +11,9 @@ UPDATE `WM_RESOURCEPLAN` SET `NS` = 'default' WHERE `NS` IS
NULL;
ALTER TABLE `WM_RESOURCEPLAN` DROP KEY `UNIQUE_WM_RESOURCEPLAN`;
ALTER TABLE `WM_RESOURCEPLAN` ADD UNIQUE KEY `UNIQUE_WM_RESOURCEPLAN` (`NAME`,
`NS`);
+-- HIVE-21063
+CREATE UNIQUE INDEX `NOTIFICATION_LOG_EVENT_ID` ON NOTIFICATION_LOG
(`EVENT_ID`) USING BTREE;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release
version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS ' ';
diff --git
a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index 2a9c38f..8af9a76 100644
---
a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -624,6 +624,8 @@ CREATE TABLE NOTIFICATION_LOG
ALTER TABLE NOTIFICATION_LOG ADD CONSTRAINT NOTIFICATION_LOG_PK PRIMARY KEY
(NL_ID);
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG(EVENT_ID);
+
CREATE TABLE NOTIFICATION_SEQUENCE
(
NNI_ID NUMBER NOT NULL,
diff --git
a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index b9f6331..231376b 100644
---
a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -9,6 +9,9 @@ UPDATE WM_RESOURCEPLAN SET NS = 'default' WHERE NS IS NULL;
DROP INDEX UNIQUE_WM_RESOURCEPLAN;
CREATE UNIQUE INDEX UNIQUE_WM_RESOURCEPLAN ON WM_RESOURCEPLAN (NS, "NAME");
+-- HIVE-21063
+CREATE UNIQUE INDEX NOTIFICATION_LOG_EVENT_ID ON NOTIFICATION_LOG(EVENT_ID);
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release
version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status
from dual;
diff --git
a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index 0a359d9..2aff1e4 100644
---
a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -631,6 +631,8 @@ CREATE TABLE "NOTIFICATION_LOG"
PRIMARY KEY ("NL_ID")
);
+CREATE UNIQUE INDEX "NOTIFICATION_LOG_EVENT_ID" ON "NOTIFICATION_LOG" USING
btree ("EVENT_ID");
+
CREATE TABLE "NOTIFICATION_SEQUENCE"
(
"NNI_ID" BIGINT NOT NULL,
diff --git
a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index 0c36069..2d4363b 100644
---
a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++
b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -11,6 +11,9 @@ UPDATE "WM_RESOURCEPLAN" SET "NS" = 'default' WHERE "NS" IS
NULL;
ALTER TABLE "WM_RESOURCEPLAN" DROP CONSTRAINT "UNIQUE_WM_RESOURCEPLAN";
ALTER TABLE ONLY "WM_RESOURCEPLAN" ADD CONSTRAINT "UNIQUE_WM_RESOURCEPLAN"
UNIQUE ("NS", "NAME");
+-- HIVE-21063
+CREATE UNIQUE INDEX "NOTIFICATION_LOG_EVENT_ID" ON "NOTIFICATION_LOG" USING
btree ("EVENT_ID");
+
-- These lines need to be last. Insert any changes above.
UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release
version 4.0.0' where "VER_ID"=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
index 7429d18..77e0c98 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
@@ -95,6 +95,7 @@ import
org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.Type;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.util.StringUtils;
import org.apache.thrift.TException;
import org.junit.Test;
@@ -3325,22 +3326,23 @@ public abstract class TestHiveMetaStore {
Warehouse wh = mock(Warehouse.class);
//Execute initializeAddedPartition() and it should not trigger
updatePartitionStatsFast() as DO_NOT_UPDATE_STATS is true
HiveMetaStore.HMSHandler hms = new HiveMetaStore.HMSHandler("", conf,
false);
- Method m = hms.getClass().getDeclaredMethod("initializeAddedPartition",
Table.class, Partition.class, boolean.class);
+ Method m = hms.getClass().getDeclaredMethod("initializeAddedPartition",
Table.class, Partition.class,
+ boolean.class, EnvironmentContext.class);
m.setAccessible(true);
//Invoke initializeAddedPartition();
- m.invoke(hms, tbl, part, false);
+ m.invoke(hms, tbl, part, false, null);
verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation());
//Remove tbl's DO_NOT_UPDATE_STATS & set STATS_AUTO_GATHER = false
tbl.unsetParameters();
MetastoreConf.setBoolVar(conf, ConfVars.STATS_AUTO_GATHER, false);
- m.invoke(hms, tbl, part, false);
+ m.invoke(hms, tbl, part, false, null);
verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation());
//Set STATS_AUTO_GATHER = true and set tbl as a VIRTUAL_VIEW
MetastoreConf.setBoolVar(conf, ConfVars.STATS_AUTO_GATHER, true);
tbl.setTableType("VIRTUAL_VIEW");
- m.invoke(hms, tbl, part, false);
+ m.invoke(hms, tbl, part, false, null);
verify(wh, never()).getFileStatusesForLocation(part.getSd().getLocation());
}
}