http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java new file mode 100644 index 0000000..7beee42 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.cache; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; +import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper; +import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; +import org.apache.hadoop.hive.metastore.hbase.HBaseUtils; +import org.apache.hive.common.util.HiveStringUtils; + +import com.google.common.annotations.VisibleForTesting; + +public class SharedCache { + private static Map<String, Database> databaseCache = new TreeMap<String, Database>(); + private static Map<String, TableWrapper> tableCache = new TreeMap<String, TableWrapper>(); + private static Map<String, PartitionWrapper> partitionCache = new TreeMap<String, PartitionWrapper>(); + private static Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<String, ColumnStatisticsObj>(); + private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>(); + private static MessageDigest md; + + static { + try { + md = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException("should not happen", e); + } + } + + public static synchronized Database getDatabaseFromCache(String name) { + return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null; + } + + public static synchronized void addDatabaseToCache(String dbName, Database db) { + Database dbCopy = db.deepCopy(); + dbCopy.setName(HiveStringUtils.normalizeIdentifier(dbName)); + databaseCache.put(dbName, dbCopy); + } + + public static synchronized void removeDatabaseFromCache(String dbName) { + databaseCache.remove(dbName); + } + + public static synchronized List<String> listCachedDatabases() { + return new ArrayList<String>(databaseCache.keySet()); + } + + public static synchronized void alterDatabaseInCache(String dbName, Database newDb) { + removeDatabaseFromCache(HiveStringUtils.normalizeIdentifier(dbName)); + addDatabaseToCache(HiveStringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy()); + } + + public static synchronized int getCachedDatabaseCount() { + return databaseCache.size(); + } + + public static synchronized Table getTableFromCache(String dbName, String tableName) { + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if (tblWrapper == null) { + return null; + } + Table t = CacheUtils.assemble(tblWrapper); + return t; + } + + public static synchronized void addTableToCache(String dbName, String tblName, Table tbl) { + Table tblCopy = tbl.deepCopy(); + tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName)); + tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName)); + for (FieldSchema fs : tblCopy.getPartitionKeys()) { + fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName())); + } + TableWrapper wrapper; + if (tbl.getSd()!=null) { + byte[] sdHash = HBaseUtils.hashStorageDescriptor(tbl.getSd(), md); + StorageDescriptor sd = tbl.getSd(); + increSd(sd, sdHash); + tblCopy.setSd(null); + wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), sd.getParameters()); + } else { + wrapper = new TableWrapper(tblCopy, null, null, null); + } + tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper); + } + + public static synchronized void removeTableFromCache(String dbName, String tblName) { + TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); + byte[] sdHash = tblWrapper.getSdHash(); + if (sdHash!=null) { + decrSd(sdHash); + } + } + + public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { + removeTableFromCache(dbName, tblName); + addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()), + HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable); + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + List<Partition> partitions = listCachedPartitions(dbName, tblName, -1); + for (Partition part : partitions) { + removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues()); + part.setDbName(HiveStringUtils.normalizeIdentifier(newTable.getDbName())); + part.setTableName(HiveStringUtils.normalizeIdentifier(newTable.getTableName())); + addPartitionToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()), + HiveStringUtils.normalizeIdentifier(newTable.getTableName()), part); + } + } + } + + public static synchronized int getCachedTableCount() { + return tableCache.size(); + } + + public static synchronized List<Table> listCachedTables(String dbName) { + List<Table> tables = new ArrayList<Table>(); + for (TableWrapper wrapper : tableCache.values()) { + if (wrapper.getTable().getDbName().equals(dbName)) { + tables.add(CacheUtils.assemble(wrapper)); + } + } + return tables; + } + + public static synchronized void updateTableColumnStatistics(String dbName, String tableName, + List<ColumnStatisticsObj> statsObjs) { + Table tbl = getTableFromCache(dbName, tableName); + tbl.getSd().getParameters(); + List<String> colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj:statsObjs) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); + alterTableInCache(dbName, tableName, tbl); + } + + public static synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) { + List<TableMeta> tableMetas = new ArrayList<TableMeta>(); + for (String dbName : listCachedDatabases()) { + if (CacheUtils.matches(dbName, dbNames)) { + for (Table table : listCachedTables(dbName)) { + if (CacheUtils.matches(table.getTableName(), tableNames)) { + if (tableTypes==null || tableTypes.contains(table.getTableType())) { + TableMeta metaData = new TableMeta( + dbName, table.getTableName(), table.getTableType()); + metaData.setComments(table.getParameters().get("comment")); + tableMetas.add(metaData); + } + } + } + } + } + return tableMetas; + } + + public static synchronized void addPartitionToCache(String dbName, String tblName, Partition part) { + Partition partCopy = part.deepCopy(); + PartitionWrapper wrapper; + if (part.getSd()!=null) { + byte[] sdHash = HBaseUtils.hashStorageDescriptor(part.getSd(), md); + StorageDescriptor sd = part.getSd(); + increSd(sd, sdHash); + partCopy.setSd(null); + wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters()); + } else { + wrapper = new PartitionWrapper(partCopy, null, null, null); + } + partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper); + } + + public static synchronized Partition getPartitionFromCache(String key) { + PartitionWrapper wrapper = partitionCache.get(key); + if (wrapper == null) { + return null; + } + Partition p = CacheUtils.assemble(wrapper); + return p; + } + + public static synchronized Partition getPartitionFromCache(String dbName, String tblName, List<String> part_vals) { + return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals)); + } + + public static synchronized boolean existPartitionFromCache(String dbName, String tblName, List<String> part_vals) { + return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); + } + + public static synchronized Partition removePartitionFromCache(String dbName, String tblName, List<String> part_vals) { + PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); + if (wrapper.getSdHash()!=null) { + decrSd(wrapper.getSdHash()); + } + return wrapper.getPartition(); + } + + public static synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) { + List<Partition> partitions = new ArrayList<Partition>(); + int count = 0; + for (PartitionWrapper wrapper : partitionCache.values()) { + if (wrapper.getPartition().getDbName().equals(dbName) + && wrapper.getPartition().getTableName().equals(tblName) + && (max == -1 || count < max)) { + partitions.add(CacheUtils.assemble(wrapper)); + count++; + } + } + return partitions; + } + + public static synchronized void alterPartitionInCache(String dbName, String tblName, List<String> partVals, Partition newPart) { + removePartitionFromCache(dbName, tblName, partVals); + addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()), + HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart); + } + + public static synchronized void updatePartitionColumnStatistics(String dbName, String tableName, + List<String> partVals, List<ColumnStatisticsObj> statsObjs) { + Partition part = getPartitionFromCache(dbName, tableName, partVals); + part.getSd().getParameters(); + List<String> colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj:statsObjs) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); + alterPartitionInCache(dbName, tableName, partVals, part); + } + + public static synchronized int getCachedPartitionCount() { + return partitionCache.size(); + } + + public static synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) { + return partitionColStatsCache.get(key); + } + + public static synchronized void addPartitionColStatsToCache(Map<String, ColumnStatisticsObj> aggrStatsPerPartition) { + partitionColStatsCache.putAll(aggrStatsPerPartition); + } + + + public static void increSd(StorageDescriptor sd, byte[] sdHash) { + ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); + if (sdCache.containsKey(byteArray)) { + sdCache.get(byteArray).refCount++; + } else { + StorageDescriptor sdToCache = sd.deepCopy(); + sdToCache.setLocation(null); + sdToCache.setParameters(null); + sdCache.put(byteArray, new StorageDescriptorWrapper(sdToCache, 1)); + } + } + + public static void decrSd(byte[] sdHash) { + ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); + StorageDescriptorWrapper sdWrapper = sdCache.get(byteArray); + sdWrapper.refCount--; + if (sdWrapper.getRefCount() == 0) { + sdCache.remove(byteArray); + } + } + + public static StorageDescriptor getSdFromCache(byte[] sdHash) { + StorageDescriptorWrapper sdWrapper = sdCache.get(new ByteArrayWrapper(sdHash)); + return sdWrapper.getSd(); + } + + // Replace databases in databaseCache with the new list + public static synchronized void refreshDatabases(List<Database> databases) { + for (String dbName : listCachedDatabases()) { + removeDatabaseFromCache(dbName); + } + for (Database db : databases) { + addDatabaseToCache(db.getName(), db); + } + } + + // Replace tables in tableCache with the new list + public static synchronized void refreshTables(String dbName, List<Table> tables) { + for (Table tbl : listCachedTables(dbName)) { + removeTableFromCache(dbName, tbl.getTableName()); + } + for (Table tbl : tables) { + addTableToCache(dbName, tbl.getTableName(), tbl); + } + } + + public static void refreshPartitions(String dbName, String tblName, List<Partition> partitions) { + List<String> keysToRemove = new ArrayList<String>(); + for (Map.Entry<String, PartitionWrapper> entry : partitionCache.entrySet()) { + if (entry.getValue().getPartition().getDbName().equals(dbName) + && entry.getValue().getPartition().getTableName().equals(tblName)) { + keysToRemove.add(entry.getKey()); + } + } + for (String key : keysToRemove) { + partitionCache.remove(key); + } + for (Partition part : partitions) { + addPartitionToCache(dbName, tblName, part); + } + } + + @VisibleForTesting + static Map<String, Database> getDatabaseCache() { + return databaseCache; + } + + @VisibleForTesting + static Map<String, TableWrapper> getTableCache() { + return tableCache; + } + + @VisibleForTesting + static Map<String, PartitionWrapper> getPartitionCache() { + return partitionCache; + } + + @VisibleForTesting + static Map<ByteArrayWrapper, StorageDescriptorWrapper> getSdCache() { + return sdCache; + } + + @VisibleForTesting + static Map<String, ColumnStatisticsObj> getPartitionColStatsCache() { + return partitionColStatsCache; + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java index 8edb50b..e5b8495 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterPartitionEvent.java @@ -27,13 +27,15 @@ public class AlterPartitionEvent extends ListenerEvent { private final Partition oldPart; private final Partition newPart; private final Table table; + private final boolean isTruncateOp; - public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table, - boolean status, HMSHandler handler) { + public AlterPartitionEvent(Partition oldPart, Partition newPart, Table table, boolean isTruncateOp, + boolean status, HMSHandler handler) { super(status, handler); this.oldPart = oldPart; this.newPart = newPart; this.table = table; + this.isTruncateOp = isTruncateOp; } /** @@ -58,4 +60,12 @@ public class AlterPartitionEvent extends ListenerEvent { public Table getTable() { return table; } + + /** + * Get the truncate table flag + * @return + */ + public boolean getIsTruncateOp() { + return isTruncateOp; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java index 4d6dce2..22ea513 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/AlterTableEvent.java @@ -26,10 +26,13 @@ public class AlterTableEvent extends ListenerEvent { private final Table newTable; private final Table oldTable; - public AlterTableEvent (Table oldTable, Table newTable, boolean status, HMSHandler handler) { + private final boolean isTruncateOp; + + public AlterTableEvent (Table oldTable, Table newTable, boolean isTruncateOp, boolean status, HMSHandler handler) { super (status, handler); this.oldTable = oldTable; this.newTable = newTable; + this.isTruncateOp = isTruncateOp; } /** @@ -45,4 +48,11 @@ public class AlterTableEvent extends ListenerEvent { public Table getNewTable() { return newTable; } + + /** + * @return the flag for truncate + */ + public boolean getIsTruncateOp() { + return isTruncateOp; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java index 7bc0e04..dff1195 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java @@ -38,6 +38,7 @@ public class InsertEvent extends ListenerEvent { private final String db; private final String table; private final Map<String, String> keyValues; + private final boolean replace; private final List<String> files; private List<String> fileChecksums = new ArrayList<String>(); @@ -56,6 +57,9 @@ public class InsertEvent extends ListenerEvent { super(status, handler); this.db = db; this.table = table; + + // If replace flag is not set by caller, then by default set it to true to maintain backward compatibility + this.replace = (insertData.isSetReplace() ? insertData.isReplace() : true); this.files = insertData.getFilesAdded(); GetTableRequest req = new GetTableRequest(db, table); req.setCapabilities(HiveMetaStoreClient.TEST_VERSION); @@ -90,6 +94,13 @@ public class InsertEvent extends ListenerEvent { } /** + * @return The replace flag. + */ + public boolean isReplace() { + return replace; + } + + /** * Get list of files created as a result of this DML operation * * @return list of new files http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java index 62aeb8c..b741549 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java @@ -21,10 +21,18 @@ package org.apache.hadoop.hive.metastore.events; import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import javax.annotation.concurrent.NotThreadSafe; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + /** * Base class for all the events which are defined for metastore. + * + * This class is not thread-safe and not expected to be called in parallel. */ +@NotThreadSafe public abstract class ListenerEvent { /** @@ -33,6 +41,26 @@ public abstract class ListenerEvent { private final boolean status; private final HMSHandler handler; + /** + * Key/value parameters used by listeners to store notifications results + * i.e. DbNotificationListener sets a DB_NOTIFICATION_EVENT_ID. + * + * NotThreadSafe: The parameters map is not expected to be access in parallel by Hive, so keep it thread-unsafe + * to avoid locking overhead. + */ + private Map<String, String> parameters; + + /** For performance concerns, it is preferable to cache the unmodifiable parameters variable that will be returned on the + * {@link #getParameters()} method. It is expected that {@link #putParameter(String, String)} is called less times + * than {@link #getParameters()}, so performance may be better by using this cache. + */ + private Map<String, String> unmodifiableParameters; + + // Listener parameters aren't expected to have many values. So far only + // DbNotificationListener will add a parameter; let's set a low initial capacity for now. + // If we find out many parameters are added, then we can adjust or remove this initial capacity. + private static final int PARAMETERS_INITIAL_CAPACITY = 1; + // Properties passed by the client, to be used in execution hooks. private EnvironmentContext environmentContext = null; @@ -40,6 +68,8 @@ public abstract class ListenerEvent { super(); this.status = status; this.handler = handler; + this.parameters = new HashMap<>(PARAMETERS_INITIAL_CAPACITY); + updateUnmodifiableParameters(); } /** @@ -49,6 +79,12 @@ public abstract class ListenerEvent { return status; } + /** + * Set the environment context of the event. + * + * @param environmentContext An EnvironmentContext object that contains environment parameters sent from + * the HMS client. + */ public void setEnvironmentContext(EnvironmentContext environmentContext) { this.environmentContext = environmentContext; } @@ -66,4 +102,74 @@ public abstract class ListenerEvent { public HMSHandler getHandler() { return handler; } + + /** + * Return all parameters of the listener event. Parameters are read-only (unmodifiable map). If a new parameter + * must be added, please use the putParameter() method. + * + * + * @return A map object with all parameters. + */ + public final Map<String, String> getParameters() { + return unmodifiableParameters; + } + + /** + * Put a new parameter to the listener event. + * + * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration + * between listeners setting the same parameters. + * + * @param name Name of the parameter. + * @param value Value of the parameter. + * @throws IllegalStateException if a parameter already exists. + */ + public void putParameter(String name, String value) { + putParameterIfAbsent(name, value); + updateUnmodifiableParameters(); + } + + /** + * Put a new set the parameters to the listener event. + * + * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration + * between listeners setting the same parameters. + * + * @param parameters A Map object with the a set of parameters. + * @throws IllegalStateException if a parameter already exists. + */ + public void putParameters(final Map<String, String> parameters) { + if (parameters != null) { + for (Map.Entry<String, String> entry : parameters.entrySet()) { + putParameterIfAbsent(entry.getKey(), entry.getValue()); + } + + updateUnmodifiableParameters(); + } + } + + /** + * Put a parameter to the listener event only if the parameter is absent. + * + * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration + * between listeners setting the same parameters. + * + * @param name Name of the parameter. + * @param value Value of the parameter. + * @throws IllegalStateException if a parameter already exists. + */ + private void putParameterIfAbsent(String name, String value) { + if (parameters.containsKey(name)) { + throw new IllegalStateException("Invalid attempt to overwrite a read-only parameter: " + name); + } + + parameters.put(name, value); + } + + /** + * Keeps a cache of unmodifiable parameters returned by the getParameters() method. + */ + private void updateUnmodifiableParameters() { + unmodifiableParameters = Collections.unmodifiableMap(parameters); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 1340645..945e99e 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -2708,6 +2708,8 @@ public class HBaseStore implements RawStore { @Override public List<SQLPrimaryKey> getPrimaryKeys(String db_name, String tbl_name) throws MetaException { + db_name = HiveStringUtils.normalizeIdentifier(db_name); + tbl_name = HiveStringUtils.normalizeIdentifier(tbl_name); boolean commit = false; openTransaction(); try { @@ -2726,6 +2728,10 @@ public class HBaseStore implements RawStore { public List<SQLForeignKey> getForeignKeys(String parent_db_name, String parent_tbl_name, String foreign_db_name, String foreign_tbl_name) throws MetaException { + parent_db_name = parent_db_name!=null?HiveStringUtils.normalizeIdentifier(parent_db_name):null; + parent_tbl_name = parent_tbl_name!=null?HiveStringUtils.normalizeIdentifier(parent_tbl_name):null; + foreign_db_name = HiveStringUtils.normalizeIdentifier(foreign_db_name); + foreign_tbl_name = HiveStringUtils.normalizeIdentifier(foreign_tbl_name); boolean commit = false; openTransaction(); try { @@ -2770,6 +2776,9 @@ public class HBaseStore implements RawStore { // This is something of pain, since we have to search both primary key and foreign key to see // which they want to drop. boolean commit = false; + dbName = HiveStringUtils.normalizeIdentifier(dbName); + tableName = HiveStringUtils.normalizeIdentifier(tableName); + constraintName = HiveStringUtils.normalizeIdentifier(constraintName); openTransaction(); try { List<SQLPrimaryKey> pk = getHBase().getPrimaryKey(dbName, tableName); @@ -2809,6 +2818,12 @@ public class HBaseStore implements RawStore { @Override public void addPrimaryKeys(List<SQLPrimaryKey> pks) throws InvalidObjectException, MetaException { boolean commit = false; + for (SQLPrimaryKey pk : pks) { + pk.setTable_db(HiveStringUtils.normalizeIdentifier(pk.getTable_db())); + pk.setTable_name(HiveStringUtils.normalizeIdentifier(pk.getTable_name())); + pk.setColumn_name(HiveStringUtils.normalizeIdentifier(pk.getColumn_name())); + pk.setPk_name(HiveStringUtils.normalizeIdentifier(pk.getPk_name())); + } openTransaction(); try { List<SQLPrimaryKey> currentPk = @@ -2830,6 +2845,13 @@ public class HBaseStore implements RawStore { @Override public void addForeignKeys(List<SQLForeignKey> fks) throws InvalidObjectException, MetaException { boolean commit = false; + for (SQLForeignKey fk : fks) { + fk.setPktable_db(HiveStringUtils.normalizeIdentifier(fk.getPktable_db())); + fk.setPktable_name(HiveStringUtils.normalizeIdentifier(fk.getPktable_name())); + fk.setFktable_db(HiveStringUtils.normalizeIdentifier(fk.getFktable_db())); + fk.setFktable_name(HiveStringUtils.normalizeIdentifier(fk.getFktable_name())); + fk.setFk_name(HiveStringUtils.normalizeIdentifier(fk.getFk_name())); + } openTransaction(); try { // Fetch the existing keys (if any) and add in these new ones @@ -2848,6 +2870,13 @@ public class HBaseStore implements RawStore { } @Override + public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + // TODO: see if it makes sense to implement this here + return null; + } + + @Override public void createTableWrite(Table tbl, long writeId, char state, long heartbeat) { // TODO: Auto-generated method stub throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 94087b1..3172f92 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -619,7 +619,7 @@ public class HBaseUtils { * @param md message descriptor to use to generate the hash * @return the hash as a byte array */ - static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) { + public static byte[] hashStorageDescriptor(StorageDescriptor sd, MessageDigest md) { // Note all maps and lists have to be absolutely sorted. Otherwise we'll produce different // results for hashes based on the OS or JVM being used. md.reset(); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java index ed6080b..e9ed7e5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java @@ -31,6 +31,8 @@ public abstract class AlterPartitionMessage extends EventMessage { public abstract String getTable(); + public abstract boolean getIsTruncateOp(); + public abstract Map<String,String> getKeyValues(); public abstract Table getTableObj() throws Exception; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java index 5487123..39a87bc 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java @@ -28,6 +28,8 @@ public abstract class AlterTableMessage extends EventMessage { public abstract String getTable(); + public abstract boolean getIsTruncateOp(); + public abstract Table getTableObjBefore() throws Exception; public abstract Table getTableObjAfter() throws Exception; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java index a5414d1..8205c25 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.thrift.TException; import java.io.IOException; @@ -30,88 +31,10 @@ import java.util.List; public class EventUtils { - /** - * Utility function that constructs a notification filter to match a given db name and/or table name. - * If dbName == null, fetches all warehouse events. - * If dnName != null, but tableName == null, fetches all events for the db - * If dbName != null && tableName != null, fetches all events for the specified table - * @param dbName - * @param tableName - * @return - */ - public static IMetaStoreClient.NotificationFilter getDbTblNotificationFilter(final String dbName, final String tableName){ - return new IMetaStoreClient.NotificationFilter() { - @Override - public boolean accept(NotificationEvent event) { - if (event == null){ - return false; // get rid of trivial case first, so that we can safely assume non-null - } - if (dbName == null){ - return true; // if our dbName is null, we're interested in all wh events - } - if (dbName.equalsIgnoreCase(event.getDbName())){ - if ( (tableName == null) - // if our dbName is equal, but tableName is blank, we're interested in this db-level event - || (tableName.equalsIgnoreCase(event.getTableName())) - // table level event that matches us - ){ - return true; - } - } - return false; - } - }; - } - - public static IMetaStoreClient.NotificationFilter restrictByMessageFormat(final String messageFormat){ - return new IMetaStoreClient.NotificationFilter() { - @Override - public boolean accept(NotificationEvent event) { - if (event == null){ - return false; // get rid of trivial case first, so that we can safely assume non-null - } - if (messageFormat == null){ - return true; // let's say that passing null in will not do any filtering. - } - if (messageFormat.equalsIgnoreCase(event.getMessageFormat())){ - return true; - } - return false; - } - }; - } - - public static IMetaStoreClient.NotificationFilter getEventBoundaryFilter(final Long eventFrom, final Long eventTo){ - return new IMetaStoreClient.NotificationFilter() { - @Override - public boolean accept(NotificationEvent event) { - if ( (event == null) || (event.getEventId() < eventFrom) || (event.getEventId() > eventTo)) { - return false; - } - return true; - } - }; - } - - public static IMetaStoreClient.NotificationFilter andFilter( - final IMetaStoreClient.NotificationFilter... filters ) { - return new IMetaStoreClient.NotificationFilter() { - @Override - public boolean accept(NotificationEvent event) { - for (IMetaStoreClient.NotificationFilter filter : filters){ - if (!filter.accept(event)){ - return false; - } - } - return true; - } - }; - } - public interface NotificationFetcher { - public int getBatchSize() throws IOException; - public long getCurrentNotificationEventId() throws IOException; - public List<NotificationEvent> getNextNotificationEvents( + int getBatchSize() throws IOException; + long getCurrentNotificationEventId() throws IOException; + List<NotificationEvent> getNextNotificationEvents( long pos, IMetaStoreClient.NotificationFilter filter) throws IOException; } @@ -177,7 +100,7 @@ public class EventUtils { public NotificationEventIterator( NotificationFetcher nfetcher, long eventFrom, int maxEvents, String dbName, String tableName) throws IOException { - init(nfetcher, eventFrom, maxEvents, EventUtils.getDbTblNotificationFilter(dbName, tableName)); + init(nfetcher, eventFrom, maxEvents, new DatabaseAndTableFilter(dbName, tableName)); // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter // is an operation that needs to run before delegating to the other ctor, and this messes up chaining // ctors http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java index 3d16721..6d146e0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java @@ -37,6 +37,12 @@ public abstract class InsertMessage extends EventMessage { public abstract String getTable(); /** + * Getter for the replace flag being insert into/overwrite + * @return Replace flag to represent INSERT INTO or INSERT OVERWRITE (Boolean). + */ + public abstract boolean isReplace(); + + /** * Get the map of partition keyvalues. Will be null if this insert is to a table and not a * partition. * @return Map of partition keyvalues, or null. http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java index aa770f2..1bd52a8 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java @@ -149,9 +149,10 @@ public abstract class MessageFactory { * and some are not yet supported. * @param before The table before the alter * @param after The table after the alter + * @param isTruncateOp Flag to denote truncate table * @return */ - public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after); + public abstract AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp); /** * Factory method for DropTableMessage. @@ -175,10 +176,11 @@ public abstract class MessageFactory { * @param table The table in which the partition is being altered * @param before The partition before it was altered * @param after The partition after it was altered + * @param isTruncateOp Flag to denote truncate partition * @return a new AlterPartitionMessage */ public abstract AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after); + Partition after, boolean isTruncateOp); /** * Factory method for DropPartitionMessage. @@ -231,9 +233,10 @@ public abstract class MessageFactory { * @param table Name of the table the insert occurred in * @param partVals Partition values for the partition that the insert occurred in, may be null if * the insert was done into a non-partitioned table + * @param replace Flag to represent if INSERT OVERWRITE or INSERT INTO * @param files Iterator of file created * @return instance of InsertMessage */ public abstract InsertMessage buildInsertMessage(String db, String table, - Map<String, String> partVals, Iterator<String> files); + Map<String, String> partVals, boolean replace, Iterator<String> files); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java index b10b8a8..4fd7f8c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/PartitionFiles.java @@ -22,10 +22,13 @@ import java.util.Iterator; import java.util.List; import com.google.common.collect.Lists; +import org.codehaus.jackson.annotate.JsonProperty; public class PartitionFiles { + @JsonProperty private String partitionName; + @JsonProperty private List<String> files; public PartitionFiles(String partitionName, Iterator<String> files) { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java new file mode 100644 index 0000000..d6429f6 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/AndFilter.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.event.filters; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +public class AndFilter implements IMetaStoreClient.NotificationFilter { + final IMetaStoreClient.NotificationFilter[] filters; + + public AndFilter(final IMetaStoreClient.NotificationFilter... filters) { + this.filters = filters; + } + + @Override + public boolean accept(final NotificationEvent event) { + for (IMetaStoreClient.NotificationFilter filter : filters) { + if (!filter.accept(event)) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java new file mode 100644 index 0000000..5294063 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/BasicFilter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.event.filters; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +public abstract class BasicFilter implements NotificationFilter { + @Override + public boolean accept(final NotificationEvent event) { + if (event == null) { + return false; // get rid of trivial case first, so that we can safely assume non-null + } + return shouldAccept(event); + } + + abstract boolean shouldAccept(final NotificationEvent event); +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java new file mode 100644 index 0000000..4a7ca6d --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/DatabaseAndTableFilter.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.event.filters; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +/** + * Utility function that constructs a notification filter to match a given db name and/or table name. + * If dbName == null, fetches all warehouse events. + * If dnName != null, but tableName == null, fetches all events for the db + * If dbName != null && tableName != null, fetches all events for the specified table + */ +public class DatabaseAndTableFilter extends BasicFilter { + private final String databaseName, tableName; + + public DatabaseAndTableFilter(final String databaseName, final String tableName) { + this.databaseName = databaseName; + this.tableName = tableName; + } + + @Override + boolean shouldAccept(final NotificationEvent event) { + if (databaseName == null) { + return true; // if our dbName is null, we're interested in all wh events + } + if (databaseName.equalsIgnoreCase(event.getDbName())) { + if ((tableName == null) + // if our dbName is equal, but tableName is blank, we're interested in this db-level event + || (tableName.equalsIgnoreCase(event.getTableName())) + // table level event that matches us + ) { + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java new file mode 100644 index 0000000..137b4ce --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/EventBoundaryFilter.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.event.filters; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +public class EventBoundaryFilter extends BasicFilter { + private final long eventFrom, eventTo; + + public EventBoundaryFilter(final long eventFrom, final long eventTo) { + this.eventFrom = eventFrom; + this.eventTo = eventTo; + } + + @Override + boolean shouldAccept(final NotificationEvent event) { + return eventFrom <= event.getEventId() && event.getEventId() <= eventTo; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java new file mode 100644 index 0000000..4e91ee6 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/event/filters/MessageFormatFilter.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore.messaging.event.filters; + +import org.apache.hadoop.hive.metastore.api.NotificationEvent; + +public class MessageFormatFilter extends BasicFilter { + private final String format; + + public MessageFormatFilter(String format) { + this.format = format; + } + + @Override + boolean shouldAccept(final NotificationEvent event) { + if (format == null) { + return true; // let's say that passing null in will not do any filtering. + } + return format.equalsIgnoreCase(event.getMessageFormat()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java index dd1bf3c..bd7776c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterPartitionMessage.java @@ -37,6 +37,9 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage { String server, servicePrincipal, db, table, tableObjJson; @JsonProperty + String isTruncateOp; + + @JsonProperty Long timestamp; @JsonProperty @@ -52,11 +55,12 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage { } public JSONAlterPartitionMessage(String server, String servicePrincipal, Table tableObj, - Partition partitionObjBefore, Partition partitionObjAfter, Long timestamp) { + Partition partitionObjBefore, Partition partitionObjAfter, boolean isTruncateOp, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = tableObj.getDbName(); this.table = tableObj.getTableName(); + this.isTruncateOp = Boolean.toString(isTruncateOp); this.timestamp = timestamp; this.keyValues = JSONMessageFactory.getPartitionKeyValues(tableObj, partitionObjBefore); try { @@ -95,6 +99,9 @@ public class JSONAlterPartitionMessage extends AlterPartitionMessage { } @Override + public boolean getIsTruncateOp() { return Boolean.parseBoolean(isTruncateOp); } + + @Override public Map<String, String> getKeyValues() { return keyValues; } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java index 792015e..58eb1a7 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAlterTableMessage.java @@ -32,6 +32,9 @@ public class JSONAlterTableMessage extends AlterTableMessage { String server, servicePrincipal, db, table, tableObjBeforeJson, tableObjAfterJson; @JsonProperty + String isTruncateOp; + + @JsonProperty Long timestamp; /** @@ -41,11 +44,12 @@ public class JSONAlterTableMessage extends AlterTableMessage { } public JSONAlterTableMessage(String server, String servicePrincipal, Table tableObjBefore, Table tableObjAfter, - Long timestamp) { + boolean isTruncateOp, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = tableObjBefore.getDbName(); this.table = tableObjBefore.getTableName(); + this.isTruncateOp = Boolean.toString(isTruncateOp); this.timestamp = timestamp; try { this.tableObjBeforeJson = JSONMessageFactory.createTableObjJson(tableObjBefore); @@ -82,6 +86,9 @@ public class JSONAlterTableMessage extends AlterTableMessage { } @Override + public boolean getIsTruncateOp() { return Boolean.parseBoolean(isTruncateOp); } + + @Override public Table getTableObjBefore() throws Exception { return (Table) JSONMessageFactory.getTObj(tableObjBeforeJson,Table.class); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java index e1316a4..c059d47 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java @@ -40,6 +40,9 @@ public class JSONInsertMessage extends InsertMessage { Long timestamp; @JsonProperty + String replace; + + @JsonProperty List<String> files; @JsonProperty @@ -52,12 +55,13 @@ public class JSONInsertMessage extends InsertMessage { } public JSONInsertMessage(String server, String servicePrincipal, String db, String table, - Map<String, String> partKeyVals, Iterator<String> fileIter, Long timestamp) { + Map<String, String> partKeyVals, boolean replace, Iterator<String> fileIter, Long timestamp) { this.server = server; this.servicePrincipal = servicePrincipal; this.db = db; this.table = table; this.timestamp = timestamp; + this.replace = Boolean.toString(replace); this.partKeyVals = partKeyVals; this.files = Lists.newArrayList(fileIter); checkValid(); @@ -99,6 +103,9 @@ public class JSONInsertMessage extends InsertMessage { } @Override + public boolean isReplace() { return Boolean.parseBoolean(replace); } + + @Override public String toString() { try { return JSONMessageDeserializer.mapper.writeValueAsString(this); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java index 41732c7..40ef5fb 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.codehaus.jackson.map.DeserializationConfig; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; /** * MessageDeserializer implementation, for deserializing from JSON strings. @@ -46,6 +47,9 @@ public class JSONMessageDeserializer extends MessageDeserializer { static { mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, false); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_IS_GETTERS, false); + mapper.configure(SerializationConfig.Feature.AUTO_DETECT_FIELDS, false); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java index 3406afb..04a4041 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java @@ -28,6 +28,10 @@ import javax.annotation.Nullable; import com.google.common.collect.Iterables; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.Index; @@ -104,8 +108,8 @@ public class JSONMessageFactory extends MessageFactory { } @Override - public AlterTableMessage buildAlterTableMessage(Table before, Table after) { - return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, now()); + public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp) { + return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, isTruncateOp, now()); } @Override @@ -123,8 +127,8 @@ public class JSONMessageFactory extends MessageFactory { @Override public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after) { - return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, before, after, + Partition after, boolean isTruncateOp) { + return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, before, after, isTruncateOp, now()); } @@ -161,10 +165,9 @@ public class JSONMessageFactory extends MessageFactory { } @Override - public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals, + public InsertMessage buildInsertMessage(String db, String table, Map<String, String> partKeyVals, boolean replace, Iterator<String> fileIter) { - return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, - fileIter, now()); + return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals, replace, fileIter, now()); } private long now() { @@ -298,5 +301,4 @@ public class JSONMessageFactory extends MessageFactory { }; return getTObjs(Iterables.transform(jsonArrayIterator, textExtractor), objClass); } - } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java b/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java index 63be7b7..10fcbea 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/parser/ExpressionTree.java @@ -284,7 +284,7 @@ public class ExpressionTree { //can only support "=" and "!=" for now, because our JDO lib is buggy when // using objects from map.get() private static final Set<Operator> TABLE_FILTER_OPS = Sets.newHashSet( - Operator.EQUALS, Operator.NOTEQUALS, Operator.NOTEQUALS2); + Operator.EQUALS, Operator.NOTEQUALS, Operator.NOTEQUALS2, Operator.LIKE); private void generateJDOFilterOverTables(Map<String, Object> params, FilterBuilder filterBuilder) throws MetaException { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index d378d06..970038d 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -52,6 +52,7 @@ import javax.sql.DataSource; import java.io.IOException; import java.io.PrintWriter; +import java.nio.ByteBuffer; import java.sql.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -147,6 +148,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { static final private Logger LOG = LoggerFactory.getLogger(TxnHandler.class.getName()); static private DataSource connPool; + private static DataSource connPoolMutex; static private boolean doRetryOnConnPool = false; private enum OpertaionType { @@ -203,8 +205,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { private int deadlockCnt; private long deadlockRetryInterval; protected HiveConf conf; - protected DatabaseProduct dbProduct; - private SQLGenerator sqlGenerator; + private static DatabaseProduct dbProduct; + private static SQLGenerator sqlGenerator; // (End user) Transaction timeout, in milliseconds. private long timeout; @@ -223,7 +225,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { */ private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = new ConcurrentHashMap<>(); private static final String hostname = ServerUtils.hostname(); - private static volatile boolean dumpConfig = true; // Private methods should never catch SQLException and then throw MetaException. The public // methods depend on SQLException coming back so they can detect and handle deadlocks. Private @@ -247,20 +248,36 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { checkQFileTestHack(); - Connection dbConn = null; - // Set up the JDBC connection pool - try { - setupJdbcConnectionPool(conf); - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - determineDatabaseProduct(dbConn); - sqlGenerator = new SQLGenerator(dbProduct, conf); - } catch (SQLException e) { - String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage(); - LOG.error(msg); - throw new RuntimeException(e); - } - finally { - closeDbConn(dbConn); + synchronized (TxnHandler.class) { + if (connPool == null) { + //only do this once per JVM; useful for support + LOG.info(HiveConfUtil.dumpConfig(conf).toString()); + + Connection dbConn = null; + // Set up the JDBC connection pool + try { + int maxPoolSize = conf.getIntVar(HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_MAX_CONNECTIONS); + long getConnectionTimeoutMs = 30000; + connPool = setupJdbcConnectionPool(conf, maxPoolSize, getConnectionTimeoutMs); + /*the mutex pools should ideally be somewhat larger since some operations require 1 + connection from each pool and we want to avoid taking a connection from primary pool + and then blocking because mutex pool is empty. There is only 1 thread in any HMS trying + to mutex on each MUTEX_KEY except MUTEX_KEY.CheckLock. The CheckLock operation gets a + connection from connPool first, then connPoolMutex. All others, go in the opposite + order (not very elegant...). So number of connection requests for connPoolMutex cannot + exceed (size of connPool + MUTEX_KEY.values().length - 1).*/ + connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + MUTEX_KEY.values().length, getConnectionTimeoutMs); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + determineDatabaseProduct(dbConn); + sqlGenerator = new SQLGenerator(dbProduct, conf); + } catch (SQLException e) { + String msg = "Unable to instantiate JDBC connection pooling, " + e.getMessage(); + LOG.error(msg); + throw new RuntimeException(e); + } finally { + closeDbConn(dbConn); + } + } } timeout = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS); @@ -270,11 +287,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { retryLimit = HiveConf.getIntVar(conf, HiveConf.ConfVars.HMSHANDLERATTEMPTS); deadlockRetryInterval = retryInterval / 10; maxOpenTxns = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS); - if(dumpConfig) { - LOG.info(HiveConfUtil.dumpConfig(conf).toString()); - //only do this once per JVM; useful for support - dumpConfig = false; - } } @Override @RetrySemantics.ReadOnly @@ -367,7 +379,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { try { /** * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()} -\ */ + */ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); String s = "select ntxn_next - 1 from NEXT_TXN_ID"; @@ -383,23 +395,27 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { "initialized, null record found in next_txn_id"); } close(rs); - Set<Long> openList = new HashSet<Long>(); + List<Long> openList = new ArrayList<Long>(); //need the WHERE clause below to ensure consistent results with READ_COMMITTED - s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm; + s = "select txn_id, txn_state from TXNS where txn_id <= " + hwm + " order by txn_id"; LOG.debug("Going to execute query<" + s + ">"); rs = stmt.executeQuery(s); long minOpenTxn = Long.MAX_VALUE; + BitSet abortedBits = new BitSet(); while (rs.next()) { long txnId = rs.getLong(1); openList.add(txnId); char c = rs.getString(2).charAt(0); if(c == TXN_OPEN) { minOpenTxn = Math.min(minOpenTxn, txnId); + } else if (c == TXN_ABORTED) { + abortedBits.set(openList.size() - 1); } } LOG.debug("Going to rollback"); dbConn.rollback(); - GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList); + ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray()); + GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer); if(minOpenTxn < Long.MAX_VALUE) { otr.setMin_open_txn(minOpenTxn); } @@ -844,7 +860,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { /** * As much as possible (i.e. in absence of retries) we want both operations to be done on the same * connection (but separate transactions). This avoid some flakiness in BONECP where if you - * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one + * perform an operation on 1 connection and immediately get another from the pool, the 2nd one * doesn't see results of the first. * * Retry-by-caller note: If the call to lock is from a transaction, then in the worst case @@ -983,6 +999,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { case SELECT: updateTxnComponents = false; break; + case NO_TXN: + /*this constant is a bit of a misnomer since we now always have a txn context. It + just means the operation is such that we don't care what tables/partitions it + affected as it doesn't trigger a compaction or conflict detection. A better name + would be NON_TRANSACTIONAL.*/ + updateTxnComponents = false; + break; default: //since we have an open transaction, only 4 values above are expected throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType() @@ -1934,7 +1957,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } - protected Connection getDbConn(int isolationLevel) throws SQLException { + Connection getDbConn(int isolationLevel) throws SQLException { + return getDbConn(isolationLevel, connPool); + } + private Connection getDbConn(int isolationLevel, DataSource connPool) throws SQLException { int rc = doRetryOnConnPool ? 10 : 1; Connection dbConn = null; while (true) { @@ -2457,14 +2483,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { response.setLockid(extLockId); LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId)); - Savepoint save = dbConn.setSavepoint();//todo: get rid of this + Savepoint save = dbConn.setSavepoint(); StringBuilder query = new StringBuilder("select hl_lock_ext_id, " + "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " + "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in ("); Set<String> strings = new HashSet<String>(locksBeingChecked.size()); - //This the set of entities that the statement represnted by extLockId wants to update + //This the set of entities that the statement represented by extLockId wants to update List<LockInfo> writeSet = new ArrayList<>(); for (LockInfo info : locksBeingChecked) { @@ -3131,9 +3157,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } - private static synchronized void setupJdbcConnectionPool(HiveConf conf) throws SQLException { - if (connPool != null) return; - + private static synchronized DataSource setupJdbcConnectionPool(HiveConf conf, int maxPoolSize, long getConnectionTimeoutMs) throws SQLException { String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY); String user = getMetastoreJdbcUser(conf); String passwd = getMetastoreJdbcPasswd(conf); @@ -3143,33 +3167,40 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if ("bonecp".equals(connectionPooler)) { BoneCPConfig config = new BoneCPConfig(); config.setJdbcUrl(driverUrl); - //if we are waiting for connection for 60s, something is really wrong + //if we are waiting for connection for a long time, something is really wrong //better raise an error than hang forever - config.setConnectionTimeoutInMs(60000); - config.setMaxConnectionsPerPartition(10); + //see DefaultConnectionStrategy.getConnectionInternal() + config.setConnectionTimeoutInMs(getConnectionTimeoutMs); + config.setMaxConnectionsPerPartition(maxPoolSize); config.setPartitionCount(1); config.setUser(user); config.setPassword(passwd); - connPool = new BoneCPDataSource(config); doRetryOnConnPool = true; // Enable retries to work around BONECP bug. + return new BoneCPDataSource(config); } else if ("dbcp".equals(connectionPooler)) { - ObjectPool objectPool = new GenericObjectPool(); + GenericObjectPool objectPool = new GenericObjectPool(); + //https://commons.apache.org/proper/commons-pool/api-1.6/org/apache/commons/pool/impl/GenericObjectPool.html#setMaxActive(int) + objectPool.setMaxActive(maxPoolSize); + objectPool.setMaxWait(getConnectionTimeoutMs); ConnectionFactory connFactory = new DriverManagerConnectionFactory(driverUrl, user, passwd); // This doesn't get used, but it's still necessary, see // http://svn.apache.org/viewvc/commons/proper/dbcp/branches/DBCP_1_4_x_BRANCH/doc/ManualPoolingDataSourceExample.java?view=markup PoolableConnectionFactory poolConnFactory = new PoolableConnectionFactory(connFactory, objectPool, null, null, false, true); - connPool = new PoolingDataSource(objectPool); + return new PoolingDataSource(objectPool); } else if ("hikaricp".equals(connectionPooler)) { HikariConfig config = new HikariConfig(); + config.setMaximumPoolSize(maxPoolSize); config.setJdbcUrl(driverUrl); config.setUsername(user); config.setPassword(passwd); + //https://github.com/brettwooldridge/HikariCP + config.setConnectionTimeout(getConnectionTimeoutMs); - connPool = new HikariDataSource(config); + return new HikariDataSource(config); } else if ("none".equals(connectionPooler)) { LOG.info("Choosing not to pool JDBC connections"); - connPool = new NoPoolConnectionPool(conf); + return new NoPoolConnectionPool(conf); } else { throw new RuntimeException("Unknown JDBC connection pooling " + connectionPooler); } @@ -3427,7 +3458,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { try { String sqlStmt = sqlGenerator.addForUpdateClause("select MT_COMMENT from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0"); lockInternal(); - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex); stmt = dbConn.createStatement(); if(LOG.isDebugEnabled()) { LOG.debug("About to execute SQL: " + sqlStmt); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 517eec3..6e0070b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -32,9 +32,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.BitSet; import java.util.List; import java.util.Map; -import java.util.Set; public class TxnUtils { private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class); @@ -50,8 +50,13 @@ public class TxnUtils { * @return a valid txn list. */ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) { + /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0 + * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which + * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should + * inlude the latest committed set.*/ long highWater = txns.getTxn_high_water_mark(); - Set<Long> open = txns.getOpen_txns(); + List<Long> open = txns.getOpen_txns(); + BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits()); long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)]; int i = 0; for(long txn: open) { @@ -59,10 +64,10 @@ public class TxnUtils { exceptions[i++] = txn; } if(txns.isSetMin_open_txn()) { - return new ValidReadTxnList(exceptions, highWater, txns.getMin_open_txn()); + return new ValidReadTxnList(exceptions, abortedBits, highWater, txns.getMin_open_txn()); } else { - return new ValidReadTxnList(exceptions, highWater); + return new ValidReadTxnList(exceptions, abortedBits, highWater); } } @@ -93,7 +98,9 @@ public class TxnUtils { exceptions = Arrays.copyOf(exceptions, i); } highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1; - return new ValidCompactorTxnList(exceptions, highWater); + BitSet bitSet = new BitSet(exceptions.length); + bitSet.set(0, bitSet.length()); // for ValidCompactorTxnList, everything in exceptions are aborted + return new ValidCompactorTxnList(exceptions, bitSet, highWater); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/model/package.jdo ---------------------------------------------------------------------- diff --git a/metastore/src/model/package.jdo b/metastore/src/model/package.jdo index 844bc46..67e2c20 100644 --- a/metastore/src/model/package.jdo +++ b/metastore/src/model/package.jdo @@ -63,10 +63,10 @@ <class name="MFieldSchema" embedded-only="true" table="TYPE_FIELDS" detachable="true"> <field name="name"> - <column name="FNAME" length="128" jdbc-type="VARCHAR"/> + <column name="FNAME" length="767" jdbc-type="VARCHAR"/> </field> <field name="type" > - <column name="FTYPE" length="4000" jdbc-type="VARCHAR" allows-null="false"/> + <column name="FTYPE" jdbc-type="CLOB" allows-null="false"/> </field> <field name="comment" > <column name="FCOMMENT" length="4000" jdbc-type="VARCHAR" allows-null="true"/> @@ -118,7 +118,7 @@ <column name="DB_ID"/> </index> <field name="tableName"> - <column name="TBL_NAME" length="128" jdbc-type="VARCHAR"/> + <column name="TBL_NAME" length="256" jdbc-type="VARCHAR"/> </field> <field name="database"> <column name="DB_ID"/> @@ -170,7 +170,7 @@ <column name="PARAM_KEY" length="256" jdbc-type="VARCHAR"/> </key> <value> - <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/> + <column name="PARAM_VALUE" jdbc-type="CLOB"/> </value> </field> <field name="viewOriginalText" default-fetch-group="false"> @@ -251,14 +251,14 @@ <column name="PARAM_KEY" length="256" jdbc-type="VARCHAR"/> </key> <value> - <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/> + <column name="PARAM_VALUE" jdbc-type="CLOB"/> </value> </field> </class> <class name="MOrder" embedded-only="true" table="SORT_ORDER" detachable="true"> <field name="col"> - <column name="COL_NAME" length="128" jdbc-type="VARCHAR"/> + <column name="COL_NAME" length="767" jdbc-type="VARCHAR"/> </field> <field name="order"> <column name="ORDER" jdbc-type="INTEGER" allows-null="false"/> @@ -280,10 +280,10 @@ <element> <embedded> <field name="name"> - <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/> + <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/> </field> <field name="type"> - <column name="TYPE_NAME" length="4000" jdbc-type="VARCHAR" allows-null="false"/> + <column name="TYPE_NAME" jdbc-type="CLOB" allows-null="false"/> </field> <field name="comment"> <column name="COMMENT" length="256" jdbc-type="VARCHAR" allows-null="true"/> @@ -349,7 +349,7 @@ <element> <embedded> <field name="col"> - <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/> + <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/> </field> <field name="order"> <column name="ORDER" jdbc-type="INTEGER" allows-null="false"/> @@ -366,7 +366,7 @@ <column name="PARAM_KEY" length="256" jdbc-type="VARCHAR"/> </key> <value> - <column name="PARAM_VALUE" length="4000" jdbc-type="VARCHAR"/> + <column name="PARAM_VALUE" jdbc-type="CLOB"/> </value> </field> <field name="skewedColNames" table="SKEWED_COL_NAMES"> @@ -725,7 +725,7 @@ <column name="TBL_ID" /> </field> <field name="columnName"> - <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/> + <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/> </field> <field name="privilege"> <column name="TBL_COL_PRIV" length="128" jdbc-type="VARCHAR"/> @@ -770,7 +770,7 @@ <column name="PART_ID" /> </field> <field name="columnName"> - <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR"/> + <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR"/> </field> <field name="privilege"> <column name="PART_COL_PRIV" length="128" jdbc-type="VARCHAR"/> @@ -803,7 +803,7 @@ <column name="DB_NAME" length="128" jdbc-type="VARCHAR"/> </field> <field name="tblName"> - <column name="TBL_NAME" length="128" jdbc-type="VARCHAR"/> + <column name="TBL_NAME" length="256" jdbc-type="VARCHAR"/> </field> <field name="partName"> <column name="PARTITION_NAME" length="767" jdbc-type="VARCHAR"/> @@ -850,13 +850,13 @@ <column name="DB_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/> </field> <field name="tableName"> - <column name="TABLE_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/> + <column name="TABLE_NAME" length="256" jdbc-type="VARCHAR" allows-null="false"/> </field> <field name="table"> <column name="TBL_ID"/> </field> <field name="colName"> - <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/> + <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR" allows-null="false"/> </field> <field name="colType"> <column name="COLUMN_TYPE" length="128" jdbc-type="VARCHAR" allows-null="false"/> @@ -911,7 +911,7 @@ <column name="DB_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/> </field> <field name="tableName"> - <column name="TABLE_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/> + <column name="TABLE_NAME" length="256" jdbc-type="VARCHAR" allows-null="false"/> </field> <field name="partitionName"> <column name="PARTITION_NAME" length="767" jdbc-type="VARCHAR" allows-null="false"/> @@ -920,7 +920,7 @@ <column name="PART_ID"/> </field> <field name="colName"> - <column name="COLUMN_NAME" length="128" jdbc-type="VARCHAR" allows-null="false"/> + <column name="COLUMN_NAME" length="767" jdbc-type="VARCHAR" allows-null="false"/> </field> <field name="colType"> <column name="COLUMN_TYPE" length="128" jdbc-type="VARCHAR" allows-null="false"/> @@ -1050,7 +1050,7 @@ <column name="DB_NAME" length="128" jdbc-type="VARCHAR" allows-null="true"/> </field> <field name="tableName"> - <column name="TBL_NAME" length="128" jdbc-type="VARCHAR" allows-null="true"/> + <column name="TBL_NAME" length="256" jdbc-type="VARCHAR" allows-null="true"/> </field> <field name="message"> <column name="MESSAGE" jdbc-type="LONGVARCHAR"/> http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 64da9b4..7760bc7 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; @@ -873,6 +874,13 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable { } @Override + public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + String tableName) throws MetaException, NoSuchObjectException { + // TODO Auto-generated method stub + return null; + } + + @Override public void createTableWrite(Table tbl, long writeId, char state, long heartbeat) { }
