http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index cb899d7..9e4f3c2 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1135,6 +1135,7 @@ struct CreationMetadata { struct NotificationEventRequest { 1: required i64 lastEvent, 2: optional i32 maxEvents, + 3: optional list<string> eventTypeSkipList, } struct NotificationEvent {
http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index 0ea46f8..617c7bc 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -274,7 +274,7 @@ public class HiveAlterHandler implements AlterHandler { part.setDbName(newDbName); part.setTableName(newTblName); ColumnStatistics colStats = updateOrGetPartitionColumnStats(msdb, catName, dbname, name, - part.getValues(), part.getSd().getCols(), oldt, part, null); + part.getValues(), part.getSd().getCols(), oldt, part, null, null); if (colStats != null) { columnStatsNeedUpdated.put(part, colStats); } @@ -312,7 +312,7 @@ public class HiveAlterHandler implements AlterHandler { } } else { alterTableUpdateTableColumnStats( - msdb, oldt, newt, environmentContext, writeIdList); + msdb, oldt, newt, environmentContext, writeIdList, conf, null); } } else { // operations other than table rename @@ -332,7 +332,7 @@ public class HiveAlterHandler implements AlterHandler { List<FieldSchema> oldCols = part.getSd().getCols(); part.getSd().setCols(newt.getSd().getCols()); ColumnStatistics colStats = updateOrGetPartitionColumnStats(msdb, catName, dbname, name, - part.getValues(), oldCols, oldt, part, null); + part.getValues(), oldCols, oldt, part, null, null); assert(colStats == null); if (cascade) { msdb.alterPartition( @@ -349,11 +349,11 @@ public class HiveAlterHandler implements AlterHandler { } else { LOG.warn("Alter table not cascaded to partitions."); alterTableUpdateTableColumnStats( - msdb, oldt, newt, environmentContext, writeIdList); + msdb, oldt, newt, environmentContext, writeIdList, conf, null); } } else { alterTableUpdateTableColumnStats( - msdb, oldt, newt, environmentContext, writeIdList); + msdb, oldt, newt, environmentContext, writeIdList, conf, null); } } @@ -481,7 +481,7 @@ public class HiveAlterHandler implements AlterHandler { // PartitionView does not have SD. We do not need update its column stats if (oldPart.getSd() != null) { updateOrGetPartitionColumnStats(msdb, catName, dbname, name, new_part.getValues(), - oldPart.getSd().getCols(), tbl, new_part, null); + oldPart.getSd().getCols(), tbl, new_part, null, null); } msdb.alterPartition( catName, dbname, name, new_part.getValues(), new_part, validWriteIds); @@ -620,7 +620,7 @@ public class HiveAlterHandler implements AlterHandler { String newPartName = Warehouse.makePartName(tbl.getPartitionKeys(), new_part.getValues()); ColumnStatistics cs = updateOrGetPartitionColumnStats(msdb, catName, dbname, name, oldPart.getValues(), - oldPart.getSd().getCols(), tbl, new_part, null); + oldPart.getSd().getCols(), tbl, new_part, null, null); msdb.alterPartition(catName, dbname, name, part_vals, new_part, validWriteIds); if (cs != null) { cs.getStatsDesc().setPartName(newPartName); @@ -727,7 +727,7 @@ public class HiveAlterHandler implements AlterHandler { // PartitionView does not have SD and we do not need to update its column stats if (oldTmpPart.getSd() != null) { updateOrGetPartitionColumnStats(msdb, catName, dbname, name, oldTmpPart.getValues(), - oldTmpPart.getSd().getCols(), tbl, tmpPart, null); + oldTmpPart.getSd().getCols(), tbl, tmpPart, null, null); } } @@ -799,8 +799,8 @@ public class HiveAlterHandler implements AlterHandler { } @VisibleForTesting - void alterTableUpdateTableColumnStats(RawStore msdb, Table oldTable, Table newTable, - EnvironmentContext ec, String validWriteIds) + public static List<ColumnStatisticsObj> alterTableUpdateTableColumnStats(RawStore msdb, Table oldTable, Table newTable, + EnvironmentContext ec, String validWriteIds, Configuration conf, List<String> deletedCols) throws MetaException, InvalidObjectException { String catName = normalizeIdentifier(oldTable.isSetCatName() ? oldTable.getCatName() : getDefaultCatalog(conf)); @@ -808,11 +808,13 @@ public class HiveAlterHandler implements AlterHandler { String tableName = normalizeIdentifier(oldTable.getTableName()); String newDbName = newTable.getDbName().toLowerCase(); String newTableName = normalizeIdentifier(newTable.getTableName()); + List<ColumnStatisticsObj> newStatsObjs = new ArrayList<>(); + //if its not called from cahced store then update the table + boolean doAlterTable = deletedCols == null; try { List<FieldSchema> oldCols = oldTable.getSd().getCols(); List<FieldSchema> newCols = newTable.getSd().getCols(); - List<ColumnStatisticsObj> newStatsObjs = new ArrayList<>(); ColumnStatistics colStats = null; boolean updateColumnStats = !newDbName.equals(dbName) || !newTableName.equals(tableName) || !MetaStoreServerUtils.columnsIncludedByNameType(oldCols, newCols); @@ -834,7 +836,10 @@ public class HiveAlterHandler implements AlterHandler { } else { List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); if (statsObjs != null) { - List<String> deletedCols = new ArrayList<>(); + // for out para, this value is initialized by caller. + if (deletedCols == null) { + deletedCols = new ArrayList<>(); + } for (ColumnStatisticsObj statsObj : statsObjs) { boolean found = false; for (FieldSchema newCol : newCols) { @@ -847,28 +852,36 @@ public class HiveAlterHandler implements AlterHandler { if (found) { if (!newDbName.equals(dbName) || !newTableName.equals(tableName)) { - msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName()); + if (doAlterTable) { + msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName()); + } newStatsObjs.add(statsObj); deletedCols.add(statsObj.getColName()); } } else { - msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName()); + if (doAlterTable) { + msdb.deleteTableColumnStatistics(catName, dbName, tableName, statsObj.getColName()); + } deletedCols.add(statsObj.getColName()); } } - StatsSetupConst.removeColumnStatsState(newTable.getParameters(), deletedCols); + if (doAlterTable) { + StatsSetupConst.removeColumnStatsState(newTable.getParameters(), deletedCols); + } } } } - // Change to new table and append stats for the new table - msdb.alterTable(catName, dbName, tableName, newTable, validWriteIds); - if (updateColumnStats && !newStatsObjs.isEmpty()) { - ColumnStatisticsDesc statsDesc = colStats.getStatsDesc(); - statsDesc.setDbName(newDbName); - statsDesc.setTableName(newTableName); - colStats.setStatsObj(newStatsObjs); - msdb.updateTableColumnStatistics(colStats, validWriteIds, newTable.getWriteId()); + if (doAlterTable) { + // Change to new table and append stats for the new table + msdb.alterTable(catName, dbName, tableName, newTable, validWriteIds); + if (updateColumnStats && !newStatsObjs.isEmpty()) { + ColumnStatisticsDesc statsDesc = colStats.getStatsDesc(); + statsDesc.setDbName(newDbName); + statsDesc.setTableName(newTableName); + colStats.setStatsObj(newStatsObjs); + msdb.updateTableColumnStatistics(colStats, validWriteIds, newTable.getWriteId()); + } } } catch (NoSuchObjectException nsoe) { LOG.debug("Could not find db entry." + nsoe); @@ -876,13 +889,15 @@ public class HiveAlterHandler implements AlterHandler { //should not happen since the input were verified before passed in throw new InvalidObjectException("Invalid inputs to update table column stats: " + e); } + return newStatsObjs; } - private ColumnStatistics updateOrGetPartitionColumnStats( + public static ColumnStatistics updateOrGetPartitionColumnStats( RawStore msdb, String catName, String dbname, String tblname, List<String> partVals, - List<FieldSchema> oldCols, Table table, Partition part, List<FieldSchema> newCols) + List<FieldSchema> oldCols, Table table, Partition part, List<FieldSchema> newCols, List<String> deletedCols) throws MetaException, InvalidObjectException { ColumnStatistics newPartsColStats = null; + boolean updateColumnStats = true; try { // if newCols are not specified, use default ones. if (newCols == null) { @@ -906,10 +921,17 @@ public class HiveAlterHandler implements AlterHandler { List<ColumnStatistics> partsColStats = msdb.getPartitionColumnStatistics(catName, dbname, tblname, oldPartNames, oldColNames); assert (partsColStats.size() <= 1); + + // for out para, this value is initialized by caller. + if (deletedCols == null) { + deletedCols = new ArrayList<>(); + } else { + // in case deletedCols is provided by caller, stats will be updated by caller. + updateColumnStats = false; + } for (ColumnStatistics partColStats : partsColStats) { //actually only at most one loop List<ColumnStatisticsObj> newStatsObjs = new ArrayList<>(); List<ColumnStatisticsObj> statsObjs = partColStats.getStatsObj(); - List<String> deletedCols = new ArrayList<>(); for (ColumnStatisticsObj statsObj : statsObjs) { boolean found =false; for (FieldSchema newCol : newCols) { @@ -921,17 +943,25 @@ public class HiveAlterHandler implements AlterHandler { } if (found) { if (rename) { - msdb.deletePartitionColumnStatistics(catName, dbname, tblname, partColStats.getStatsDesc().getPartName(), - partVals, statsObj.getColName()); + if (updateColumnStats) { + msdb.deletePartitionColumnStatistics(catName, dbname, tblname, + partColStats.getStatsDesc().getPartName(), partVals, statsObj.getColName()); + } else { + deletedCols.add(statsObj.getColName()); + } newStatsObjs.add(statsObj); } } else { - msdb.deletePartitionColumnStatistics(catName, dbname, tblname, partColStats.getStatsDesc().getPartName(), - partVals, statsObj.getColName()); + if (updateColumnStats) { + msdb.deletePartitionColumnStatistics(catName, dbname, tblname, partColStats.getStatsDesc().getPartName(), + partVals, statsObj.getColName()); + } deletedCols.add(statsObj.getColName()); } } - StatsSetupConst.removeColumnStatsState(part.getParameters(), deletedCols); + if (updateColumnStats) { + StatsSetupConst.removeColumnStatsState(part.getParameters(), deletedCols); + } if (!newStatsObjs.isEmpty()) { partColStats.setStatsObj(newStatsObjs); newPartsColStats = partColStats; http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 598847d..0a1b96d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hadoop.hive.metastore.events.OpenTxnEvent; +import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent; import org.apache.hadoop.hive.metastore.events.PreAddPartitionEvent; import org.apache.hadoop.hive.metastore.events.PreAlterCatalogEvent; import org.apache.hadoop.hive.metastore.events.PreAlterDatabaseEvent; @@ -140,6 +141,9 @@ import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent; import org.apache.hadoop.hive.metastore.events.PreReadISchemaEvent; import org.apache.hadoop.hive.metastore.events.PreReadTableEvent; import org.apache.hadoop.hive.metastore.events.PreReadhSchemaVersionEvent; +import org.apache.hadoop.hive.metastore.events.DeletePartitionColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.DeleteTableColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.metrics.JvmPauseMonitor; import org.apache.hadoop.hive.metastore.metrics.Metrics; @@ -568,6 +572,19 @@ public class HiveMetaStore extends ThriftHiveMetastore { listeners.add(new HMSMetricsListener(conf)); } + boolean canCachedStoreCanUseEvent = false; + for (MetaStoreEventListener listener : transactionalListeners) { + if (listener.doesAddEventsToNotificationLogTable()) { + canCachedStoreCanUseEvent = true; + break; + } + } + if (conf.getBoolean(ConfVars.METASTORE_CACHE_CAN_USE_EVENT.getVarname(), false) && + !canCachedStoreCanUseEvent) { + throw new MetaException("CahcedStore can not use events for invalidation as there is no " + + " TransactionalMetaStoreEventListener to add events to notification table"); + } + endFunctionListeners = MetaStoreServerUtils.getMetaStoreListeners( MetaStoreEndFunctionListener.class, conf, MetastoreConf.getVar(conf, ConfVars.END_FUNCTION_LISTENERS)); @@ -5786,14 +5803,33 @@ public class HiveMetaStore extends ThriftHiveMetastore { colStats.getStatsDesc().getCatName(), colStats.getStatsDesc().getDbName(), colStats.getStatsDesc().getTableName())); - boolean ret = false; + Map<String, String> parameters = null; + getMS().openTransaction(); + boolean committed = false; try { - ret = getMS().updateTableColumnStatistics(colStats, validWriteIds, writeId) != null; + parameters = getMS().updateTableColumnStatistics(colStats, validWriteIds, writeId); + if (parameters != null) { + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.UPDATE_TABLE_COLUMN_STAT, + new UpdateTableColumnStatEvent(colStats, parameters, validWriteIds, writeId, this)); + } + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.UPDATE_TABLE_COLUMN_STAT, + new UpdateTableColumnStatEvent(colStats, parameters, validWriteIds, writeId, this)); + } + } + committed = getMS().commitTransaction(); } finally { - endFunction("write_column_statistics", ret != false, null, + if (!committed) { + getMS().rollbackTransaction(); + } + endFunction("write_column_statistics", parameters != null, null, colStats.getStatsDesc().getTableName()); } - return ret; + + return parameters != null; } private void normalizeColStatsInput(ColumnStatistics colStats) throws MetaException { @@ -5826,16 +5862,37 @@ public class HiveMetaStore extends ThriftHiveMetastore { boolean ret = false; + Map<String, String> parameters; + List<String> partVals; + boolean committed = false; + getMS().openTransaction(); try { if (tbl == null) { tbl = getTable(catName, dbName, tableName); } - List<String> partVals = getPartValsFromName(tbl, csd.getPartName()); - return getMS().updatePartitionColumnStatistics( - colStats, partVals, validWriteIds, writeId) != null; + partVals = getPartValsFromName(tbl, csd.getPartName()); + parameters = getMS().updatePartitionColumnStatistics(colStats, partVals, validWriteIds, writeId); + if (parameters != null) { + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.UPDATE_PARTITION_COLUMN_STAT, + new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, validWriteIds, writeId, this)); + } + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.UPDATE_PARTITION_COLUMN_STAT, + new UpdatePartitionColumnStatEvent(colStats, partVals, parameters, validWriteIds, writeId, this)); + } + } + committed = getMS().commitTransaction(); } finally { + if (!committed) { + getMS().rollbackTransaction(); + } endFunction("write_partition_column_statistics", ret != false, null, tableName); } + + return parameters != null; } @Override @@ -5889,6 +5946,20 @@ public class HiveMetaStore extends ThriftHiveMetastore { ret = getMS().deletePartitionColumnStatistics(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName, convertedPartName, partVals, colName); + if (ret) { + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DELETE_PARTITION_COLUMN_STAT, + new DeletePartitionColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName, + convertedPartName, partVals, colName, this)); + } + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DELETE_PARTITION_COLUMN_STAT, + new DeletePartitionColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName, + convertedPartName, partVals, colName, this)); + } + } committed = getMS().commitTransaction(); } finally { if (!committed) { @@ -5926,6 +5997,20 @@ public class HiveMetaStore extends ThriftHiveMetastore { } ret = getMS().deleteTableColumnStatistics(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName, colName); + if (ret) { + if (transactionalListeners != null && !transactionalListeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(transactionalListeners, + EventType.DELETE_TABLE_COLUMN_STAT, + new DeleteTableColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], + tableName, colName, this)); + } + if (!listeners.isEmpty()) { + MetaStoreListenerNotifier.notifyEvent(listeners, + EventType.DELETE_TABLE_COLUMN_STAT, + new DeleteTableColumnStatEvent(parsedDbName[CAT_NAME], parsedDbName[DB_NAME], + tableName, colName, this)); + } + } committed = getMS().commitTransaction(); } finally { if (!committed) { http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java index de226bf..0d6d10f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java @@ -56,6 +56,10 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent; import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; +import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.DeleteTableColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.DeletePartitionColumnStatEvent; import org.apache.hadoop.hive.metastore.tools.SQLGenerator; import java.sql.Connection; @@ -294,6 +298,49 @@ public abstract class MetaStoreEventListener implements Configurable { throws MetaException { } + /** + * This will be called to update table column stats + * @param updateTableColumnStatEvent event to be processed + * @throws MetaException + */ + public void onUpdateTableColumnStat(UpdateTableColumnStatEvent updateTableColumnStatEvent) + throws MetaException { + } + + /** + * This will be called to delete table column stats + * @param deleteTableColumnStatEvent event to be processed + * @throws MetaException + */ + public void onDeleteTableColumnStat(DeleteTableColumnStatEvent deleteTableColumnStatEvent) + throws MetaException { + } + + /** + * This will be called to update partition column stats + * @param updatePartColStatEvent event to be processed + * @throws MetaException + */ + public void onUpdatePartitionColumnStat(UpdatePartitionColumnStatEvent updatePartColStatEvent) + throws MetaException { + } + + /** + * This will be called to delete partition column stats + * @param deletePartColStatEvent event to be processed + * @throws MetaException + */ + public void onDeletePartitionColumnStat(DeletePartitionColumnStatEvent deletePartColStatEvent) + throws MetaException { + } + + /** + * This is to check if the listener adds the event info to notification log table. + */ + public boolean doesAddEventsToNotificationLogTable() { + return false; + } + @Override public Configuration getConf() { return this.conf; http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java index c296f57..dd82c4b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java @@ -55,6 +55,10 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent; import org.apache.hadoop.hive.metastore.events.AbortTxnEvent; import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent; import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; +import org.apache.hadoop.hive.metastore.events.UpdateTableColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.DeleteTableColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.UpdatePartitionColumnStatEvent; +import org.apache.hadoop.hive.metastore.events.DeletePartitionColumnStatEvent; import org.apache.hadoop.hive.metastore.tools.SQLGenerator; import java.sql.Connection; import java.util.List; @@ -224,6 +228,14 @@ public class MetaStoreListenerNotifier { (listener, event) -> listener.onAllocWriteId((AllocWriteIdEvent) event, null, null)) .put(EventType.ACID_WRITE, (listener, event) -> listener.onAcidWrite((AcidWriteEvent) event, null, null)) + .put(EventType.UPDATE_TABLE_COLUMN_STAT, + (listener, event) -> listener.onUpdateTableColumnStat((UpdateTableColumnStatEvent) event)) + .put(EventType.DELETE_TABLE_COLUMN_STAT, + (listener, event) -> listener.onDeleteTableColumnStat((DeleteTableColumnStatEvent) event)) + .put(EventType.UPDATE_PARTITION_COLUMN_STAT, + (listener, event) -> listener.onUpdatePartitionColumnStat((UpdatePartitionColumnStatEvent) event)) + .put(EventType.DELETE_PARTITION_COLUMN_STAT, + (listener, event) -> listener.onDeletePartitionColumnStat((DeletePartitionColumnStatEvent) event)) .build() ); http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 0324a19..d43c0c1 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -4236,7 +4236,7 @@ public class ObjectStore implements RawStore, Configurable { * Verifies that the stats JSON string is unchanged for alter table (txn stats). * @return Error message with the details of the change, or null if the value has not changed. */ - private static String verifyStatsChangeCtx(Map<String, String> oldP, Map<String, String> newP, + public static String verifyStatsChangeCtx(Map<String, String> oldP, Map<String, String> newP, long writeId, String validWriteIds, boolean isColStatsChange) { if (validWriteIds != null && writeId > 0) return null; // We have txn context. String oldVal = oldP == null ? null : oldP.get(StatsSetupConst.COLUMN_STATS_ACCURATE); @@ -9859,13 +9859,25 @@ public class ObjectStore implements RawStore, Configurable { try { openTransaction(); long lastEvent = rqst.getLastEvent(); - query = pm.newQuery(MNotificationLog.class, "eventId > lastEvent"); - query.declareParameters("java.lang.Long lastEvent"); + List<Object> parameterVals = new ArrayList<>(); + parameterVals.add(lastEvent); + StringBuilder filterBuilder = new StringBuilder("eventId > para" + parameterVals.size()); + StringBuilder parameterBuilder = new StringBuilder("java.lang.Long para" + parameterVals.size()); + if (rqst.isSetEventTypeSkipList()) { + for (String eventType : rqst.getEventTypeSkipList()) { + parameterVals.add(eventType); + parameterBuilder.append(", java.lang.String para" + parameterVals.size()); + filterBuilder.append(" && eventType != para" + parameterVals.size()); + } + } + query = pm.newQuery(MNotificationLog.class, filterBuilder.toString()); + query.declareParameters(parameterBuilder.toString()); query.setOrdering("eventId ascending"); int maxEventResponse = MetastoreConf.getIntVar(conf, ConfVars.METASTORE_MAX_EVENT_RESPONSE); int maxEvents = (rqst.getMaxEvents() < maxEventResponse && rqst.getMaxEvents() > 0) ? rqst.getMaxEvents() : maxEventResponse; query.setRange(0, maxEvents); - Collection<MNotificationLog> events = (Collection) query.execute(lastEvent); + Collection<MNotificationLog> events = + (Collection) query.executeWithArray(parameterVals.toArray(new Object[parameterVals.size()])); commited = commitTransaction(); if (events == null) { return result; http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index e4ef46f..bb504b0 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -38,6 +38,7 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.DatabaseName; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.TableName; @@ -49,13 +50,29 @@ import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.HiveAlterHandler; import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType; import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator; import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage; +import org.apache.hadoop.hive.metastore.messaging.DeleteTableColumnStatMessage; +import org.apache.hadoop.hive.metastore.messaging.UpdatePartitionColumnStatMessage; +import org.apache.hadoop.hive.metastore.messaging.DeletePartitionColumnStatMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageBuilder; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.FileUtils; import org.apache.hadoop.hive.metastore.utils.JavaUtils; @@ -94,9 +111,11 @@ public class CachedStore implements RawStore, Configurable { private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm(); private RawStore rawStore = null; private Configuration conf; - private boolean areTxnStatsSupported; + private static boolean areTxnStatsSupported; private PartitionExpressionProxy expressionProxy = null; private static final SharedCache sharedCache = new SharedCache(); + private static boolean canUseEvents = false; + private static long lastEventId; static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); @@ -119,7 +138,38 @@ public class CachedStore implements RawStore, Configurable { initSharedCache(conf); } + synchronized private static void triggerUpdateUsingEvent(RawStore rawStore) { + if (!isCachePrewarmed.get()) { + LOG.error("cache update should be done only after prewarm"); + throw new RuntimeException("cache update should be done only after prewarm"); + } + long startTime = System.nanoTime(); + long preEventId = lastEventId; + try { + lastEventId = updateUsingNotificationEvents(rawStore, lastEventId); + } catch (Exception e) { + LOG.error(" cache update failed for start event id " + lastEventId + " with error ", e); + throw new RuntimeException(e.getMessage()); + } finally { + long endTime = System.nanoTime(); + LOG.info("Time taken in updateUsingNotificationEvents for num events : " + (lastEventId - preEventId) + " = " + + (endTime - startTime) / 1000000 + "ms"); + } + } + + synchronized private static void triggerPreWarm(RawStore rawStore) { + lastEventId = rawStore.getCurrentNotificationEventId().getEventId(); + prewarm(rawStore); + } + private void setConfInternal(Configuration conf) { + if (MetastoreConf.getBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT)) { + canUseEvents = true; + } else { + canUseEvents = false; + } + LOG.info("canUseEvents is set to " + canUseEvents + " in cached Store"); + String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName()); if (rawStore == null) { @@ -151,6 +201,201 @@ public class CachedStore implements RawStore, Configurable { } @VisibleForTesting + public static SharedCache getSharedCache() { + return sharedCache; + } + + static private ColumnStatistics updateStatsForPart(RawStore rawStore, Table before, String catalogName, + String dbName, String tableName, Partition part) throws Exception { + ColumnStatistics colStats; + List<String> deletedCols = new ArrayList<>(); + colStats = HiveAlterHandler.updateOrGetPartitionColumnStats(rawStore, catalogName, dbName, tableName, + part.getValues(), part.getSd().getCols(), before, part, null, deletedCols); + for (String column : deletedCols) { + sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName, part.getValues(), column); + } + if (colStats != null) { + sharedCache.updatePartitionColStatsInCache(catalogName, dbName, tableName, part.getValues(), colStats.getStatsObj()); + } + return colStats; + } + + static private void updateStatsForTable(RawStore rawStore, Table before, Table after, String catalogName, + String dbName, String tableName) throws Exception { + ColumnStatistics colStats = null; + List<String> deletedCols = new ArrayList<>(); + if (before.isSetPartitionKeys()) { + List<Partition> parts = sharedCache.listCachedPartitions(catalogName, dbName, tableName, -1); + for (Partition part : parts) { + colStats = updateStatsForPart(rawStore, before, catalogName, dbName, tableName, part); + } + } + + boolean needUpdateAggrStat = false; + List<ColumnStatisticsObj> statisticsObjs = HiveAlterHandler.alterTableUpdateTableColumnStats(rawStore, before, + after,null, null, rawStore.getConf(), deletedCols); + if (colStats != null) { + sharedCache.updateTableColStatsInCache(catalogName, dbName, tableName, statisticsObjs); + needUpdateAggrStat = true; + } + for (String column : deletedCols) { + sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, column); + needUpdateAggrStat = true; + } + } + + @VisibleForTesting + public static long updateUsingNotificationEvents(RawStore rawStore, long lastEventId) throws Exception { + LOG.debug("updating cache using notification events starting from event id " + lastEventId); + NotificationEventRequest rqst = new NotificationEventRequest(lastEventId); + + //Add the events which are not related to metadata update + rqst.addToEventTypeSkipList(MessageBuilder.INSERT_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.OPEN_TXN_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.COMMIT_TXN_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.ABORT_TXN_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.ALLOC_WRITE_ID_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.ACID_WRITE_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.CREATE_FUNCTION_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.DROP_FUNCTION_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.ADD_PRIMARYKEY_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.ADD_FOREIGNKEY_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.ADD_UNIQUECONSTRAINT_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.ADD_NOTNULLCONSTRAINT_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.DROP_CONSTRAINT_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.CREATE_ISCHEMA_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.ALTER_ISCHEMA_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.DROP_ISCHEMA_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.ADD_SCHEMA_VERSION_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.ALTER_SCHEMA_VERSION_EVENT); + rqst.addToEventTypeSkipList(MessageBuilder.DROP_SCHEMA_VERSION_EVENT); + + Deadline.startTimer("getNextNotification"); + NotificationEventResponse resp = rawStore.getNextNotification(rqst); + Deadline.stopTimer(); + + if (resp == null || resp.getEvents() == null) { + LOG.debug("no events to process"); + return lastEventId; + } + + List<NotificationEvent> eventList = resp.getEvents(); + LOG.debug("num events to process" + eventList.size()); + + for (NotificationEvent event : eventList) { + long eventId = event.getEventId(); + if (eventId <= lastEventId) { + LOG.error("Event id is not valid " + lastEventId + " : " + eventId); + throw new RuntimeException(" event id is not valid " + lastEventId + " : " + eventId); + } + lastEventId = eventId; + String message = event.getMessage(); + LOG.debug("Event to process " + event); + MessageDeserializer deserializer = MessageFactory.getInstance(event.getMessageFormat()).getDeserializer(); + String catalogName = event.getCatName() == null ? "" : event.getCatName().toLowerCase(); + String dbName = event.getDbName() == null ? "" : event.getDbName().toLowerCase(); + String tableName = event.getTableName() == null ? "" : event.getTableName().toLowerCase(); + if (!shouldCacheTable(catalogName, dbName, tableName)) { + continue; + } + switch (event.getEventType()) { + case MessageBuilder.ADD_PARTITION_EVENT: + AddPartitionMessage addPartMessage = deserializer.getAddPartitionMessage(message); + sharedCache.addPartitionsToCache(catalogName, + dbName, tableName, addPartMessage.getPartitionObjs()); + break; + case MessageBuilder.ALTER_PARTITION_EVENT: + AlterPartitionMessage alterPartitionMessage = deserializer.getAlterPartitionMessage(message); + sharedCache.alterPartitionInCache(catalogName, dbName, tableName, + alterPartitionMessage.getPtnObjBefore().getValues(), alterPartitionMessage.getPtnObjAfter()); + //TODO : Use the stat object stored in the alter table message to update the stats in cache. + if (updateStatsForPart(rawStore, alterPartitionMessage.getTableObj(), + catalogName, dbName, tableName, alterPartitionMessage.getPtnObjAfter()) != null) { + CacheUpdateMasterWork.updateTableAggregatePartitionColStats(rawStore, catalogName, dbName, tableName); + } + break; + case MessageBuilder.DROP_PARTITION_EVENT: + DropPartitionMessage dropPartitionMessage = deserializer.getDropPartitionMessage(message); + for (Map<String, String> partMap : dropPartitionMessage.getPartitions()) { + sharedCache.removePartitionFromCache(catalogName, dbName, tableName, new ArrayList<>(partMap.values())); + } + break; + case MessageBuilder.CREATE_TABLE_EVENT: + CreateTableMessage createTableMessage = deserializer.getCreateTableMessage(message); + sharedCache.addTableToCache(catalogName, dbName, + tableName, createTableMessage.getTableObj()); + break; + case MessageBuilder.ALTER_TABLE_EVENT: + AlterTableMessage alterTableMessage = deserializer.getAlterTableMessage(message); + sharedCache.alterTableInCache(catalogName, dbName, tableName, alterTableMessage.getTableObjAfter()); + //TODO : Use the stat object stored in the alter table message to update the stats in cache. + updateStatsForTable(rawStore, alterTableMessage.getTableObjBefore(), alterTableMessage.getTableObjAfter(), + catalogName, dbName, tableName); + break; + case MessageBuilder.DROP_TABLE_EVENT: + DropTableMessage dropTableMessage = deserializer.getDropTableMessage(message); + int batchSize = MetastoreConf.getIntVar(rawStore.getConf(), ConfVars.BATCH_RETRIEVE_OBJECTS_MAX); + String tableDnsPath = null; + Path tablePath = new Path(dropTableMessage.getTableObj().getSd().getLocation()); + if (tablePath != null) { + tableDnsPath = new Warehouse(rawStore.getConf()).getDnsPath(tablePath).toString(); + } + + while (true) { + Map<String, String> partitionLocations = rawStore.getPartitionLocations(catalogName, dbName, tableName, + tableDnsPath, batchSize); + if (partitionLocations == null || partitionLocations.isEmpty()) { + break; + } + sharedCache.removePartitionFromCache(catalogName, dbName, tableName, + new ArrayList<>(partitionLocations.values())); + } + sharedCache.removeTableFromCache(catalogName, dbName, tableName); + break; + case MessageBuilder.CREATE_DATABASE_EVENT: + CreateDatabaseMessage createDatabaseMessage = deserializer.getCreateDatabaseMessage(message); + sharedCache.addDatabaseToCache(createDatabaseMessage.getDatabaseObject()); + break; + case MessageBuilder.ALTER_DATABASE_EVENT: + AlterDatabaseMessage alterDatabaseMessage = deserializer.getAlterDatabaseMessage(message); + sharedCache.alterDatabaseInCache(catalogName, dbName, alterDatabaseMessage.getDbObjAfter()); + break; + case MessageBuilder.DROP_DATABASE_EVENT: + sharedCache.removeDatabaseFromCache(catalogName, dbName); + break; + case MessageBuilder.CREATE_CATALOG_EVENT: + case MessageBuilder.DROP_CATALOG_EVENT: + case MessageBuilder.ALTER_CATALOG_EVENT: + // TODO : Need to add cache invalidation for catalog events + LOG.error("catalog Events are not supported for cache invalidation : " + event.getEventType()); + break; + case MessageBuilder.UPDATE_TBL_COL_STAT_EVENT: + UpdateTableColumnStatMessage msg = deserializer.getUpdateTableColumnStatMessage(message); + updateTableColumnsStatsInternal(rawStore.getConf(), msg.getColumnStatistics(), msg.getParameters(), + msg.getValidWriteIds(), msg.getWriteId()); + break; + case MessageBuilder.DELETE_TBL_COL_STAT_EVENT: + DeleteTableColumnStatMessage msgDel = deserializer.getDeleteTableColumnStatMessage(message); + sharedCache.removeTableColStatsFromCache(catalogName, dbName, tableName, msgDel.getColName()); + break; + case MessageBuilder.UPDATE_PART_COL_STAT_EVENT: + UpdatePartitionColumnStatMessage msgPartUpdate = deserializer.getUpdatePartitionColumnStatMessage(message); + sharedCache.updatePartitionColStatsInCache(catalogName, dbName, tableName, msgPartUpdate.getPartVals(), + msgPartUpdate.getColumnStatistics().getStatsObj()); + break; + case MessageBuilder.DELETE_PART_COL_STAT_EVENT: + DeletePartitionColumnStatMessage msgPart = deserializer.getDeletePartitionColumnStatMessage(message); + sharedCache.removePartitionColStatsFromCache(catalogName, dbName, tableName, + msgPart.getPartValues(), msgPart.getColName()); + break; + default: + LOG.error("Event is not supported for cache invalidation : " + event.getEventType()); + } + } + return lastEventId; + } + + @VisibleForTesting /** * This initializes the caches in SharedCache by getting the objects from Metastore DB via * ObjectStore and populating the respective caches @@ -161,6 +406,7 @@ public class CachedStore implements RawStore, Configurable { } long startTime = System.nanoTime(); LOG.info("Prewarming CachedStore"); + long sleepTime = 100; while (!isCachePrewarmed.get()) { // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy Deadline.registerIfNot(1000000); @@ -176,6 +422,12 @@ public class CachedStore implements RawStore, Configurable { sharedCache.populateCatalogsInCache(catalogs); } catch (MetaException | NoSuchObjectException e) { LOG.warn("Failed to populate catalogs in cache, going to try again", e); + try { + Thread.sleep(sleepTime); + sleepTime = sleepTime * 2; + } catch (InterruptedException timerEx) { + LOG.info("sleep interrupted", timerEx.getMessage()); + } // try again continue; } @@ -407,8 +659,7 @@ public class CachedStore implements RawStore, Configurable { } if (runOnlyOnce) { // Some tests control the execution of the background update thread - cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, - TimeUnit.MILLISECONDS); + cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, TimeUnit.MILLISECONDS); } } @@ -439,6 +690,7 @@ public class CachedStore implements RawStore, Configurable { private boolean shouldRunPrewarm = true; private final RawStore rawStore; + CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) { this.shouldRunPrewarm = shouldRunPrewarm; String rawStoreClassName = @@ -456,11 +708,19 @@ public class CachedStore implements RawStore, Configurable { @Override public void run() { if (!shouldRunPrewarm) { - // TODO: prewarm and update can probably be merged. - update(); + if (canUseEvents) { + try { + triggerUpdateUsingEvent(rawStore); + } catch (Exception e) { + LOG.error("failed to update cache using events ", e); + } + } else { + // TODO: prewarm and update can probably be merged. + update(); + } } else { try { - prewarm(rawStore); + triggerPreWarm(rawStore); } catch (Exception e) { LOG.error("Prewarm failure", e); return; @@ -619,7 +879,7 @@ public class CachedStore implements RawStore, Configurable { // Update cached aggregate stats for all partitions of a table and for all // but default partition - private void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName, + private static void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName, String tblName) { try { Table table = rawStore.getTable(catName, dbName, tblName); @@ -675,7 +935,22 @@ public class CachedStore implements RawStore, Configurable { @Override public boolean commitTransaction() { - return rawStore.commitTransaction(); + if (!rawStore.commitTransaction()) { + return false; + } + + // In case of event based update, shared cache is not updated directly to avoid inconsistency. + // For example, if metastore B add a partition, then metastore A drop a partition later. However, on metastore A, + // it first get drop partition request, then from notification, create the partition. If there's no tombstone + // entry in partition cache to tell drop is after creation, we end up consumes the creation request. Though + // eventually there's drop partition notification, but during the interim, later event takes precedence. + // So we will not update the cache during raw store operation but wait during commit transaction to make sure that + // the event related to the current transactions are updated in the cache and thus we can support strong + // consistency in case there is only one metastore. + if (canUseEvents) { + triggerUpdateUsingEvent(rawStore); + } + return true; } @Override @@ -691,19 +966,26 @@ public class CachedStore implements RawStore, Configurable { @Override public void createCatalog(Catalog cat) throws MetaException { rawStore.createCatalog(cat); - sharedCache.addCatalogToCache(cat); + // in case of event based cache update, cache will not be updated for catalog. + if (!canUseEvents) { + sharedCache.addCatalogToCache(cat); + } } @Override public void alterCatalog(String catName, Catalog cat) throws MetaException, InvalidOperationException { rawStore.alterCatalog(catName, cat); - sharedCache.alterCatalogInCache(StringUtils.normalizeIdentifier(catName), cat); + // in case of event based cache update, cache will not be updated for catalog. + if (!canUseEvents) { + sharedCache.alterCatalogInCache(StringUtils.normalizeIdentifier(catName), cat); + } } @Override public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException { - if (!sharedCache.isCatalogCachePrewarmed()) { + // in case of event based cache update, cache will not be updated for catalog. + if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) { return rawStore.getCatalog(catalogName); } Catalog cat = sharedCache.getCatalogFromCache(normalizeIdentifier(catalogName)); @@ -715,7 +997,8 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getCatalogs() throws MetaException { - if (!sharedCache.isCatalogCachePrewarmed()) { + // in case of event based cache update, cache will not be updated for catalog. + if (!sharedCache.isCatalogCachePrewarmed() || canUseEvents) { return rawStore.getCatalogs(); } return sharedCache.listCachedCatalogs(); @@ -724,19 +1007,29 @@ public class CachedStore implements RawStore, Configurable { @Override public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException { rawStore.dropCatalog(catalogName); - catalogName = catalogName.toLowerCase(); - sharedCache.removeCatalogFromCache(catalogName); + + // in case of event based cache update, cache will not be updated for catalog. + if (!canUseEvents) { + catalogName = catalogName.toLowerCase(); + sharedCache.removeCatalogFromCache(catalogName); + } } @Override public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); - sharedCache.addDatabaseToCache(db); + // in case of event based cache update, cache will be updated during commit. + if (!canUseEvents) { + sharedCache.addDatabaseToCache(db); + } } @Override public Database getDatabase(String catName, String dbName) throws NoSuchObjectException { - if (!sharedCache.isDatabaseCachePrewarmed()) { + // in case of event based cache update, cache will be updated during commit. So within active transaction, read + // directly from rawStore to avoid reading stale data as the data updated during same transaction will not be + // updated in the cache. + if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getDatabase(catName, dbName); } dbName = dbName.toLowerCase(); @@ -751,7 +1044,8 @@ public class CachedStore implements RawStore, Configurable { @Override public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException { boolean succ = rawStore.dropDatabase(catName, dbName); - if (succ) { + if (succ && !canUseEvents) { + // in case of event based cache update, cache will be updated during commit. sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName)); } @@ -762,7 +1056,8 @@ public class CachedStore implements RawStore, Configurable { public boolean alterDatabase(String catName, String dbName, Database db) throws NoSuchObjectException, MetaException { boolean succ = rawStore.alterDatabase(catName, dbName, db); - if (succ) { + if (succ && !canUseEvents) { + // in case of event based cache update, cache will be updated during commit. sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(catName), StringUtils.normalizeIdentifier(dbName), db); } @@ -771,7 +1066,7 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getDatabases(String catName, String pattern) throws MetaException { - if (!sharedCache.isDatabaseCachePrewarmed()) { + if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getDatabases(catName, pattern); } return sharedCache.listCachedDatabases(catName, pattern); @@ -779,7 +1074,7 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getAllDatabases(String catName) throws MetaException { - if (!sharedCache.isDatabaseCachePrewarmed()) { + if (!sharedCache.isDatabaseCachePrewarmed() || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getAllDatabases(catName); } return sharedCache.listCachedDatabases(catName); @@ -821,6 +1116,10 @@ public class CachedStore implements RawStore, Configurable { @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException { rawStore.createTable(tbl); + // in case of event based cache update, cache will be updated during commit. + if (canUseEvents) { + return; + } String catName = normalizeIdentifier(tbl.getCatName()); String dbName = normalizeIdentifier(tbl.getDbName()); String tblName = normalizeIdentifier(tbl.getTableName()); @@ -835,7 +1134,8 @@ public class CachedStore implements RawStore, Configurable { public boolean dropTable(String catName, String dbName, String tblName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropTable(catName, dbName, tblName); - if (succ) { + // in case of event based cache update, cache will be updated during commit. + if (succ && !canUseEvents) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -857,7 +1157,7 @@ public class CachedStore implements RawStore, Configurable { catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getTable(catName, dbName, tblName, validWriteIds); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -898,7 +1198,8 @@ public class CachedStore implements RawStore, Configurable { @Override public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartition(part); - if (succ) { + // in case of event based cache update, cache will be updated during commit. + if (succ && !canUseEvents) { String dbName = normalizeIdentifier(part.getDbName()); String tblName = normalizeIdentifier(part.getTableName()); String catName = part.isSetCatName() ? normalizeIdentifier(part.getCatName()) : DEFAULT_CATALOG_NAME; @@ -914,7 +1215,8 @@ public class CachedStore implements RawStore, Configurable { public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(catName, dbName, tblName, parts); - if (succ) { + // in case of event based cache update, cache will be updated during commit. + if (succ && !canUseEvents) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -930,7 +1232,8 @@ public class CachedStore implements RawStore, Configurable { public boolean addPartitions(String catName, String dbName, String tblName, PartitionSpecProxy partitionSpec, boolean ifNotExists) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(catName, dbName, tblName, partitionSpec, ifNotExists); - if (succ) { + // in case of event based cache update, cache will be updated during commit. + if (succ && !canUseEvents) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -959,7 +1262,7 @@ public class CachedStore implements RawStore, Configurable { catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getPartition( catName, dbName, tblName, part_vals, validWriteIds); } @@ -990,7 +1293,7 @@ public class CachedStore implements RawStore, Configurable { catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1005,7 +1308,8 @@ public class CachedStore implements RawStore, Configurable { public boolean dropPartition(String catName, String dbName, String tblName, List<String> part_vals) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropPartition(catName, dbName, tblName, part_vals); - if (succ) { + // in case of event based cache update, cache will be updated during commit. + if (succ && !canUseEvents) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1021,6 +1325,10 @@ public class CachedStore implements RawStore, Configurable { public void dropPartitions(String catName, String dbName, String tblName, List<String> partNames) throws MetaException, NoSuchObjectException { rawStore.dropPartitions(catName, dbName, tblName, partNames); + // in case of event based cache update, cache will be updated during commit. + if (canUseEvents) { + return; + } catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); @@ -1040,7 +1348,7 @@ public class CachedStore implements RawStore, Configurable { catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getPartitions(catName, dbName, tblName, max); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1062,6 +1370,10 @@ public class CachedStore implements RawStore, Configurable { public Table alterTable(String catName, String dbName, String tblName, Table newTable, String validWriteIds) throws InvalidObjectException, MetaException { newTable = rawStore.alterTable(catName, dbName, tblName, newTable, validWriteIds); + // in case of event based cache update, cache will be updated during commit. + if (canUseEvents) { + return newTable; + } catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1096,7 +1408,8 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getTables(String catName, String dbName, String pattern) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || + (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getTables(catName, dbName, pattern); } return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), @@ -1106,7 +1419,8 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getTables(String catName, String dbName, String pattern, TableType tableType) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() + || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getTables(catName, dbName, pattern, tableType); } return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), @@ -1123,7 +1437,8 @@ public class CachedStore implements RawStore, Configurable { public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames, List<String> tableTypes) throws MetaException { // TODO Check if all required tables are allowed, if so, get it from cache - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || + (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes); } return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(catName), @@ -1134,6 +1449,9 @@ public class CachedStore implements RawStore, Configurable { @Override public List<Table> getTableObjectsByName(String catName, String dbName, List<String> tblNames) throws MetaException, UnknownDBException { + if (canUseEvents && rawStore.isActiveTransaction()) { + return rawStore.getTableObjectsByName(catName, dbName, tblNames); + } dbName = normalizeIdentifier(dbName); catName = normalizeIdentifier(catName); boolean missSomeInCache = false; @@ -1168,7 +1486,8 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getAllTables(String catName, String dbName) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || + (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getAllTables(catName, dbName); } return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), @@ -1188,7 +1507,7 @@ public class CachedStore implements RawStore, Configurable { catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.listPartitionNames(catName, dbName, tblName, max_parts); } Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1218,6 +1537,10 @@ public class CachedStore implements RawStore, Configurable { List<String> partVals, Partition newPart, String validWriteIds) throws InvalidObjectException, MetaException { newPart = rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, validWriteIds); + // in case of event based cache update, cache will be updated during commit. + if (canUseEvents) { + return newPart; + } catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1235,6 +1558,10 @@ public class CachedStore implements RawStore, Configurable { throws InvalidObjectException, MetaException { newParts = rawStore.alterPartitions( catName, dbName, tblName, partValsList, newParts, writeId, validWriteIds); + // in case of event based cache update, cache will be updated during commit. + if (canUseEvents) { + return newParts; + } catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1286,7 +1613,7 @@ public class CachedStore implements RawStore, Configurable { catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts, result); } List<String> partNames = new LinkedList<>(); @@ -1317,7 +1644,7 @@ public class CachedStore implements RawStore, Configurable { catName = normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr); } String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); @@ -1332,7 +1659,8 @@ public class CachedStore implements RawStore, Configurable { return partNames.size(); } - private static List<String> partNameToVals(String name) { + @VisibleForTesting + public static List<String> partNameToVals(String name) { if (name == null) { return null; } @@ -1350,7 +1678,7 @@ public class CachedStore implements RawStore, Configurable { catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1537,7 +1865,7 @@ public class CachedStore implements RawStore, Configurable { catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName, groupNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1562,7 +1890,7 @@ public class CachedStore implements RawStore, Configurable { catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName, groupNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1591,7 +1919,7 @@ public class CachedStore implements RawStore, Configurable { catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.listPartitionNamesPs(catName, dbName, tblName, partSpecs, maxParts); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1620,7 +1948,7 @@ public class CachedStore implements RawStore, Configurable { catName = StringUtils.normalizeIdentifier(catName); dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { + if (!shouldCacheTable(catName, dbName, tblName) || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.listPartitionsPsWithAuth(catName, dbName, tblName, partSpecs, maxParts, userName, groupNames); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -1702,29 +2030,58 @@ public class CachedStore implements RawStore, Configurable { return colStat; } + private static void updateTableColumnsStatsInternal(Configuration conf, ColumnStatistics colStats, + Map<String, String> newParams, String validWriteIds, + long writeId) throws MetaException { + String catName = colStats.getStatsDesc().isSetCatName() ? + normalizeIdentifier(colStats.getStatsDesc().getCatName()) : + getDefaultCatalog(conf); + String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); + String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); + if (!shouldCacheTable(catName, dbName, tblName)) { + return; + } + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return; + } + + boolean isTxn = TxnUtils.isTransactionalTable(table.getParameters()); + if (isTxn && validWriteIds != null) { + if (!areTxnStatsSupported) { + StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); + } else { + String errorMsg = ObjectStore.verifyStatsChangeCtx( + table.getParameters(), newParams, writeId, validWriteIds, true); + if (errorMsg != null) { + throw new MetaException(errorMsg); + } + if (!ObjectStore.isCurrentStatsValidForTheQuery(conf, newParams, table.getWriteId(), + validWriteIds, true)) { + // Make sure we set the flag to invalid regardless of the current value. + StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE); + LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + + table.getDbName() + "." + table.getTableName()); + } + } + } + + table.setWriteId(writeId); + table.setParameters(newParams); + sharedCache.alterTableInCache(catName, dbName, tblName, table); + sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj()); + } + @Override public Map<String, String> updateTableColumnStatistics(ColumnStatistics colStats, String validWriteIds, long writeId) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { Map<String, String> newParams = rawStore.updateTableColumnStatistics( colStats, validWriteIds, writeId); - if (newParams != null) { - String catName = colStats.getStatsDesc().isSetCatName() ? - normalizeIdentifier(colStats.getStatsDesc().getCatName()) : - getDefaultCatalog(conf); - String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); - String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); - if (!shouldCacheTable(catName, dbName, tblName)) { - return newParams; - } - Table table = sharedCache.getTableFromCache(catName, dbName, tblName); - if (table == null) { - // The table is not yet loaded in cache - return newParams; - } - table.setParameters(newParams); - sharedCache.alterTableInCache(catName, dbName, tblName, table); - sharedCache.updateTableColStatsInCache(catName, dbName, tblName, colStats.getStatsObj()); + // in case of event based cache update, cache will be updated during commit. + if (newParams != null && !canUseEvents) { + updateTableColumnsStatsInternal(conf, colStats, newParams, null, writeId); } return newParams; } @@ -1765,7 +2122,8 @@ public class CachedStore implements RawStore, Configurable { String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.deleteTableColumnStatistics(catName, dbName, tblName, colName); - if (succ) { + // in case of event based cache update, cache is updated during commit txn + if (succ && !canUseEvents) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1783,7 +2141,8 @@ public class CachedStore implements RawStore, Configurable { throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { Map<String, String> newParams = rawStore.updatePartitionColumnStatistics( colStats, partVals, validWriteIds, writeId); - if (newParams != null) { + // in case of event based cache update, cache is updated during commit txn + if (newParams != null && !canUseEvents) { String catName = colStats.getStatsDesc().isSetCatName() ? normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME; String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); @@ -1822,7 +2181,8 @@ public class CachedStore implements RawStore, Configurable { throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName); - if (succ) { + // in case of event based cache update, cache is updated during commit txn. + if (succ && !canUseEvents) { catName = normalizeIdentifier(catName); dbName = normalizeIdentifier(dbName); tblName = normalizeIdentifier(tblName); @@ -1851,8 +2211,9 @@ public class CachedStore implements RawStore, Configurable { tblName = StringUtils.normalizeIdentifier(tblName); // TODO: we currently cannot do transactional checks for stats here // (incl. due to lack of sync w.r.t. the below rawStore call). - if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null) { - rawStore.get_aggr_stats_for( + //TODO : need to calculate aggregate locally in cached store + if (!shouldCacheTable(catName, dbName, tblName) || writeIdList != null || canUseEvents) { + return rawStore.get_aggr_stats_for( catName, dbName, tblName, partNames, colNames, writeIdList); } Table table = sharedCache.getTableFromCache(catName, dbName, tblName); @@ -2246,6 +2607,10 @@ public class CachedStore implements RawStore, Configurable { // TODO constraintCache List<String> constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints); + // in case of event based cache update, cache is updated during commit. + if (canUseEvents) { + return constraintNames; + } String dbName = normalizeIdentifier(tbl.getDbName()); String tblName = normalizeIdentifier(tbl.getTableName()); String catName = tbl.isSetCatName() ? normalizeIdentifier(tbl.getCatName()) : http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index c24e716..ce9e383 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -210,7 +210,7 @@ public class SharedCache { } } - boolean cachePartitions(List<Partition> parts, SharedCache sharedCache) { + boolean cachePartitions(Iterable<Partition> parts, SharedCache sharedCache) { try { tableLock.writeLock().lock(); for (Partition part : parts) { @@ -292,6 +292,9 @@ public class SharedCache { tableLock.writeLock().lock(); PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildPartitionCacheKey(partVal)); + if (wrapper == null) { + return null; + } isPartitionCacheDirty.set(true); part = CacheUtils.assemble(wrapper, sharedCache); if (wrapper.getSdHash() != null) { @@ -1171,6 +1174,10 @@ public class SharedCache { } TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName)); + if (tblWrapper == null) { + //in case of retry, ignore second try. + return; + } byte[] sdHash = tblWrapper.getSdHash(); if (sdHash != null) { decrSd(sdHash); @@ -1408,7 +1415,7 @@ public class SharedCache { } public void addPartitionsToCache(String catName, String dbName, String tblName, - List<Partition> parts) { + Iterable<Partition> parts) { try { cacheLock.readLock().lock(); TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java new file mode 100644 index 0000000..d64b57d --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeletePartitionColumnStatEvent.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.events; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.IHMSHandler; + +import java.util.List; + +/** + * DeletePartitionColumnStatEvent + * Event generated for partition column stat delete event. + */ [email protected] [email protected] +public class DeletePartitionColumnStatEvent extends ListenerEvent { + private String catName, dbName, tableName, colName, partName; + + private List<String> partVals; + + /** + * @param catName catalog name + * @param dbName database name + * @param tableName table name + * @param partName partition column name + * @param partVals partition value + * @param colName column name + * @param handler handler that is firing the event + */ + public DeletePartitionColumnStatEvent(String catName, String dbName, String tableName, String partName, + List<String> partVals, String colName, IHMSHandler handler) { + super(true, handler); + this.catName = catName; + this.dbName = dbName; + this.tableName = tableName; + this.colName = colName; + this.partName = partName; + this.partVals = partVals; + } + + public String getCatName() { + return catName; + } + + public String getDBName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public String getColName() { + return colName; + } + + public String getPartName() { + return partName; + } + + public List<String> getPartVals() { + return partVals; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/ef7c3963/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java new file mode 100644 index 0000000..7638744 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/DeleteTableColumnStatEvent.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.events; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hive.metastore.IHMSHandler; + +/** + * DeleteTableColumnStatEvent + * Event generated for table column stat delete event. + */ [email protected] [email protected] +public class DeleteTableColumnStatEvent extends ListenerEvent { + private String catName, dbName, tableName, colName; + + /** + * @param catName catalog name + * @param dbName database name + * @param tableName table name + * @param colName column name + * @param handler handler that is firing the event + */ + public DeleteTableColumnStatEvent(String catName, String dbName, String tableName, String colName, IHMSHandler handler) { + super(true, handler); + this.catName = catName; + this.dbName = dbName; + this.tableName = tableName; + this.colName = colName; + } + + public String getCatName() { + return catName; + } + + public String getDBName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + public String getColName() { + return colName; + } +}
