HIVE-14645 : table conversion to and from MM (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e083d33a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e083d33a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e083d33a Branch: refs/heads/hive-14535 Commit: e083d33ac484d0ec99bc7789d7fc633723a1d634 Parents: 0f3998a Author: Sergey Shelukhin <[email protected]> Authored: Thu Oct 27 18:45:28 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Thu Oct 27 18:45:28 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/common/ValidWriteIds.java | 9 + .../hadoop/hive/metastore/HiveMetaStore.java | 4 +- .../hadoop/hive/metastore/MetaStoreUtils.java | 1 + .../hadoop/hive/metastore/ObjectStore.java | 1 + .../TransactionalValidationListener.java | 7 +- .../java/org/apache/hadoop/hive/ql/Driver.java | 12 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 222 ++++- .../hadoop/hive/ql/exec/ImportCommitTask.java | 1 + .../apache/hadoop/hive/ql/exec/MoveTask.java | 1 + .../apache/hadoop/hive/ql/exec/StatsTask.java | 4 +- .../hadoop/hive/ql/hooks/WriteEntity.java | 3 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 4 + .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 6 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 14 +- .../apache/hadoop/hive/ql/metadata/Table.java | 3 + .../BucketingSortingReduceSinkOptimizer.java | 2 +- .../hive/ql/optimizer/StatsOptimizer.java | 2 +- .../hive/ql/parse/DDLSemanticAnalyzer.java | 28 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 14 +- .../hadoop/hive/ql/plan/TableScanDesc.java | 2 +- .../queries/clientpositive/mm_conversions.q | 85 ++ .../clientpositive/llap/mm_conversions.q.out | 951 +++++++++++++++++++ 22 files changed, 1317 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java index c61b63a..78ec3ad 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java +++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java @@ -23,6 +23,7 @@ import java.util.HashSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +93,14 @@ public class ValidWriteIds { conf.set(createConfKey(dbName, tblName), source); } + public static void clearConf(HiveConf conf, String dbName, String tblName) { + if (LOG.isDebugEnabled()) { + // TODO# remove + LOG.debug("Unsetting " + createConfKey(dbName, tblName)); + } + conf.unset(createConfKey(dbName, tblName)); + } + public String toString() { // TODO: lifted from ACID config implementation... optimize if needed? e.g. ranges, base64 StringBuilder buf = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 4436f3a..68b00f3 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6457,6 +6457,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { writeId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0; tbl.setMmNextWriteId(writeId + 1); ms.alterTable(dbName, tblName, tbl); + ok = true; } finally { if (!ok) { @@ -6608,7 +6609,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { Iterator<Long> iter = ids.iterator(); long oldWatermarkId = watermarkId; while (iter.hasNext()) { - if (iter.next() != watermarkId + 1) break; + Long nextWriteId = iter.next(); + if (nextWriteId != watermarkId + 1) break; ++watermarkId; } long removed = watermarkId - oldWatermarkId; http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java index 3ee1f1c..45eef36 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java @@ -1892,6 +1892,7 @@ public class MetaStoreUtils { * @param params table properties * @return true if table is an INSERT_ONLY table, false otherwise */ + // TODO# also check that transactional is true public static boolean isInsertOnlyTable(Map<String, String> params) { String transactionalProp = params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); return transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp); http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index 8ad7059..a1b3a09 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -636,6 +636,7 @@ public class ObjectStore implements RawStore, Configurable { transactionStatus = TXN_STATUS.COMMITED; try { + LOG.error("TODO# grrrrr"); currentTransaction.commit(); } catch (Exception ex) { Throwable candidate = ex; http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java index f942479..ed05381 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/TransactionalValidationListener.java @@ -139,7 +139,7 @@ public final class TransactionalValidationListener extends MetaStorePreEventList hasValidTransactionalValue = true; } - if (!hasValidTransactionalValue) { + if (!hasValidTransactionalValue && !MetaStoreUtils.isInsertOnlyTable(oldTable.getParameters())) { // if here, there is attempt to set transactional to something other than 'true' // and NOT the same value it was before throw new MetaException("TBLPROPERTIES with 'transactional'='true' cannot be unset"); @@ -156,8 +156,9 @@ public final class TransactionalValidationListener extends MetaStorePreEventList // 'transactional_properties' must match the old value. Any attempt to alter the previous // value will throw an error. An exception will still be thrown if the previous value was // null and an attempt is made to set it. This behaviour can be changed in the future. - if (oldTransactionalPropertiesValue == null - || !oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue) ) { + if ((oldTransactionalPropertiesValue == null + || !oldTransactionalPropertiesValue.equalsIgnoreCase(transactionalPropertiesValue)) + && !MetaStoreUtils.isInsertOnlyTable(oldTable.getParameters())) { throw new MetaException("TBLPROPERTIES with 'transactional_properties' cannot be " + "altered after the table is created"); } http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 19f743b..8f3ab3a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1580,8 +1580,13 @@ public class Driver implements CommandProcessor { private static void acquireWriteIds(QueryPlan plan, HiveConf conf) throws HiveException { // Output IDs are put directly into FileSinkDesc; here, we only need to take care of inputs. for (ReadEntity input : plan.getInputs()) { - Table t = extractMmTable(input); + Table t = extractTable(input); if (t == null) continue; + Utilities.LOG14535.info("Checking " + t.getTableName() + " for being a MM table: " + t.getParameters()); + if (!MetaStoreUtils.isInsertOnlyTable(t.getParameters())) { + ValidWriteIds.clearConf(conf, t.getDbName(), t.getTableName()); + continue; + } ValidWriteIds ids = Hive.get().getValidWriteIdsForTable(t.getDbName(), t.getTableName()); ids.addToConf(conf, t.getDbName(), t.getTableName()); if (plan.getFetchTask() != null) { @@ -1590,7 +1595,7 @@ public class Driver implements CommandProcessor { } } - private static Table extractMmTable(ReadEntity input) { + private static Table extractTable(ReadEntity input) { Table t = null; switch (input.getType()) { case TABLE: @@ -1602,8 +1607,7 @@ public class Driver implements CommandProcessor { break; default: return null; } - return (t != null && !t.isTemporary() - && MetaStoreUtils.isInsertOnlyTable(t.getParameters())) ? t : null; + return (t != null && !t.isTemporary()) ? t : null; } private CommandProcessorResponse rollback(CommandProcessorResponse cpr) { http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 7cf83d8..b5ec3cb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -58,6 +58,7 @@ import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; @@ -91,6 +92,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; @@ -163,8 +165,10 @@ import org.apache.hadoop.hive.ql.plan.FileMergeDesc; import org.apache.hadoop.hive.ql.plan.GrantDesc; import org.apache.hadoop.hive.ql.plan.GrantRevokeRoleDDL; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; +import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc; import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.LockTableDesc; +import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.MsckDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc; @@ -238,6 +242,7 @@ import org.slf4j.LoggerFactory; import org.stringtemplate.v4.ST; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; /** * DDLTask implementation. @@ -1788,7 +1793,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { private int compact(Hive db, AlterTableSimpleDesc desc) throws HiveException { Table tbl = db.getTable(desc.getTableName()); - if (!AcidUtils.isAcidTable(tbl)) { + if (!AcidUtils.isFullAcidTable(tbl)) { throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, tbl.getDbName(), tbl.getTableName()); } @@ -3349,14 +3354,16 @@ public class DDLTask extends Task<DDLWork> implements Serializable { // Don't change the table object returned by the metastore, as we'll mess with it's caches. Table oldTbl = tbl; tbl = oldTbl.copy(); + // Handle child tasks here. We could add them directly whereever we need, + // but let's make it a little bit more explicit. if (allPartitions != null) { // Alter all partitions for (Partition part : allPartitions) { - alterTableOrSinglePartition(alterTbl, tbl, part); + addChildTasks(alterTableOrSinglePartition(alterTbl, tbl, part)); } } else { // Just alter the table - alterTableOrSinglePartition(alterTbl, tbl, null); + addChildTasks(alterTableOrSinglePartition(alterTbl, tbl, null)); } if (allPartitions == null) { @@ -3368,6 +3375,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } } + // TODO# WRONG!! HERE try { if (allPartitions == null) { db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), alterTbl.getEnvironmentContext()); @@ -3429,6 +3437,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable { return addIfAbsentByName(newWriteEntity, work.getOutputs()); } + private void addChildTasks(List<Task<?>> extraTasks) { + if (extraTasks == null) return; + for (Task<?> newTask : extraTasks) { + addDependentTask(newTask); + } + } + private boolean isSchemaEvolutionEnabled(Table tbl) { boolean isAcid = AcidUtils.isTablePropertyTransactional(tbl.getMetadata()); if (isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) { @@ -3437,8 +3452,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable { return false; } - private int alterTableOrSinglePartition(AlterTableDesc alterTbl, Table tbl, Partition part) - throws HiveException { + private List<Task<?>> alterTableOrSinglePartition( + AlterTableDesc alterTbl, Table tbl, Partition part) throws HiveException { if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.RENAME) { tbl.setDbName(Utilities.getDatabaseName(alterTbl.getNewName())); @@ -3576,20 +3591,9 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } sd.setCols(alterTbl.getNewCols()); } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS) { - if (part != null) { - part.getTPartition().getParameters().putAll(alterTbl.getProps()); - } else { - tbl.getTTable().getParameters().putAll(alterTbl.getProps()); - } + return alterTableAddProps(alterTbl, tbl, part); } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.DROPPROPS) { - Iterator<String> keyItr = alterTbl.getProps().keySet().iterator(); - while (keyItr.hasNext()) { - if (part != null) { - part.getTPartition().getParameters().remove(keyItr.next()); - } else { - tbl.getTTable().getParameters().remove(keyItr.next()); - } - } + return alterTableDropProps(alterTbl, tbl, part); } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSERDEPROPS) { StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd()); sd.getSerdeInfo().getParameters().putAll(alterTbl.getProps()); @@ -3724,12 +3728,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } else if (alterTbl.getOp() == AlterTableTypes.ALTERBUCKETNUM) { if (part != null) { if (part.getBucketCount() == alterTbl.getNumberBuckets()) { - return 0; + return null; } part.setBucketCount(alterTbl.getNumberBuckets()); } else { if (tbl.getNumBuckets() == alterTbl.getNumberBuckets()) { - return 0; + return null; } tbl.setNumBuckets(alterTbl.getNumberBuckets()); } @@ -3737,7 +3741,183 @@ public class DDLTask extends Task<DDLWork> implements Serializable { throw new HiveException(ErrorMsg.UNSUPPORTED_ALTER_TBL_OP, alterTbl.getOp().toString()); } - return 0; + return null; + } + + private List<Task<?>> alterTableDropProps( + AlterTableDesc alterTbl, Table tbl, Partition part) throws HiveException { + List<Task<?>> result = null; + if (part == null) { + Set<String> removedSet = alterTbl.getProps().keySet(); + boolean isFromMmTable = MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()), + isRemoved = removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES) + || removedSet.contains(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (isFromMmTable && isRemoved) { + result = generateRemoveMmTasks(tbl); + } + } + Iterator<String> keyItr = alterTbl.getProps().keySet().iterator(); + while (keyItr.hasNext()) { + if (part != null) { + part.getTPartition().getParameters().remove(keyItr.next()); + } else { + tbl.getTTable().getParameters().remove(keyItr.next()); + } + } + return result; + } + + private List<Task<?>> generateRemoveMmTasks(Table tbl) throws HiveException { + // To avoid confusion from nested MM directories when table is converted back and forth, + // we will do the following - we will rename mm_ dirs to remove the prefix; we will also + // delete any directories that are not committed. Note that this relies on locks. + // Note also that we only do the renames AFTER the metastore operation commits. + // Deleting uncommitted things is safe, but moving stuff before we convert is data loss. + List<Path> allMmDirs = new ArrayList<>(); + if (tbl.isStoredAsSubDirectories()) { + // TODO: support this? + throw new HiveException("Converting list bucketed tables stored as subdirectories " + + " to and from MM is not supported"); + } + Hive db = getHive(); + ValidWriteIds ids = db.getValidWriteIdsForTable(tbl.getDbName(), tbl.getTableName()); + if (tbl.getPartitionKeys().size() > 0) { + PartitionIterable parts = new PartitionIterable(db, tbl, null, + HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); + Iterator<Partition> partIter = parts.iterator(); + while (partIter.hasNext()) { + Partition part = partIter.next(); + checkMmLb(part); + handleRemoveMm(part.getDataLocation(), ids, allMmDirs); + } + } else { + checkMmLb(tbl); + handleRemoveMm(tbl.getDataLocation(), ids, allMmDirs); + } + List<Path> targetPaths = new ArrayList<>(allMmDirs.size()); + int prefixLen = ValidWriteIds.MM_PREFIX.length(); + for (int i = 0; i < allMmDirs.size(); ++i) { + Path src = allMmDirs.get(i); + Path tgt = new Path(src.getParent(), src.getName().substring(prefixLen + 1)); + Utilities.LOG14535.info("Will move " + src + " to " + tgt); + targetPaths.add(tgt); + } + // Don't set inputs and outputs - the locks have already been taken so it's pointless. + MoveWork mw = new MoveWork(null, null, null, null, false); + mw.setMultiFilesDesc(new LoadMultiFilesDesc(allMmDirs, targetPaths, true, null, null)); + return Lists.<Task<?>>newArrayList(TaskFactory.get(mw, conf)); + } + + private void checkMmLb(Table tbl) throws HiveException { + if (!tbl.isStoredAsSubDirectories()) return; + // TODO: support this? + throw new HiveException("Converting list bucketed tables stored as subdirectories " + + " to and from MM is not supported"); + } + + private void checkMmLb(Partition part) throws HiveException { + if (!part.isStoredAsSubDirectories()) return; + // TODO: support this? + throw new HiveException("Converting list bucketed tables stored as subdirectories " + + " to and from MM is not supported. Please create a table in the desired format."); + } + + private void handleRemoveMm( + Path path, ValidWriteIds ids, List<Path> result) throws HiveException { + // Note: doesn't take LB into account; that is not presently supported here (throws above). + try { + FileSystem fs = path.getFileSystem(conf); + for (FileStatus file : fs.listStatus(path)) { + Path childPath = file.getPath(); + if (!file.isDirectory()) { + ensureDelete(fs, childPath, "a non-directory file"); + continue; + } + Long writeId = ValidWriteIds.extractWriteId(childPath); + if (writeId == null) { + ensureDelete(fs, childPath, "an unknown directory"); + } else if (!ids.isValid(writeId)) { + // Assume no concurrent active writes - we rely on locks here. We could check and fail. + ensureDelete(fs, childPath, "an uncommitted directory"); + } else { + result.add(childPath); + } + } + } catch (IOException ex) { + throw new HiveException(ex); + } + } + + private static void ensureDelete(FileSystem fs, Path path, String what) throws IOException { + Utilities.LOG14535.info("Deleting " + what + " " + path); + try { + if (!fs.delete(path, true)) throw new IOException("delete returned false"); + } catch (Exception ex) { + String error = "Couldn't delete " + path + "; cannot remove MM setting from the table"; + LOG.error(error, ex); + throw (ex instanceof IOException) ? (IOException)ex : new IOException(ex); + } + } + + private List<Task<?>> generateAddMmTasks(Table tbl) throws HiveException { + // We will move all the files in the table/partition directories into the first MM + // directory, then commit the first write ID. + List<Path> srcs = new ArrayList<>(), tgts = new ArrayList<>(); + Hive db = getHive(); + long mmWriteId = db.getNextTableWriteId(tbl.getDbName(), tbl.getTableName()); + String mmDir = ValidWriteIds.getMmFilePrefix(mmWriteId); + if (tbl.getPartitionKeys().size() > 0) { + PartitionIterable parts = new PartitionIterable(db, tbl, null, + HiveConf.getIntVar(conf, ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); + Iterator<Partition> partIter = parts.iterator(); + while (partIter.hasNext()) { + Partition part = partIter.next(); + checkMmLb(part); + Path src = part.getDataLocation(), tgt = new Path(src, mmDir); + srcs.add(src); + tgts.add(tgt); + Utilities.LOG14535.info("Will move " + src + " to " + tgt); + } + } else { + checkMmLb(tbl); + Path src = tbl.getDataLocation(), tgt = new Path(src, mmDir); + srcs.add(src); + tgts.add(tgt); + Utilities.LOG14535.info("Will move " + src + " to " + tgt); + } + // Don't set inputs and outputs - the locks have already been taken so it's pointless. + MoveWork mw = new MoveWork(null, null, null, null, false); + mw.setMultiFilesDesc(new LoadMultiFilesDesc(srcs, tgts, true, null, null)); + ImportCommitWork icw = new ImportCommitWork(tbl.getDbName(), tbl.getTableName(), mmWriteId); + // TODO# this is hacky and will be gone with ACID. The problem is getting the write ID above + // modifies the table, but the table object above is preserved and modified without + // getting this change, so saving it will overwrite write ID. Ideally, when we save + // only specific fields, and not overwrite write ID every time we alter table. + // There's probably some way in DN to achieve that, but for now let's just update the + // original object here. This is safe due to DDL lock and the fact that converting + // the table to MM here from non-MM should mean no concurrent write ID updates. + tbl.setMmNextWriteId(mmWriteId + 1); + Task<?> mv = TaskFactory.get(mw, conf), ic = TaskFactory.get(icw, conf); + mv.addDependentTask(ic); + return Lists.<Task<?>>newArrayList(mv); + } + + private List<Task<?>> alterTableAddProps(AlterTableDesc alterTbl, Table tbl, + Partition part) throws HiveException { + List<Task<?>> result = null; + if (part != null) { + part.getTPartition().getParameters().putAll(alterTbl.getProps()); + } else { + boolean isFromMmTable = MetaStoreUtils.isInsertOnlyTable(tbl.getParameters()), + isToMmTable = MetaStoreUtils.isInsertOnlyTable(alterTbl.getProps()); + if (!isFromMmTable && isToMmTable) { + result = generateAddMmTasks(tbl); + } else if (isFromMmTable && !isToMmTable) { + result = generateRemoveMmTasks(tbl); + } + tbl.getTTable().getParameters().putAll(alterTbl.getProps()); + } + return result; } private int dropConstraint(Hive db, AlterTableDesc alterTbl) http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java index efa9bc3..ba009b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java @@ -39,6 +39,7 @@ public class ImportCommitTask extends Task<ImportCommitWork> { try { if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) { + Utilities.LOG14535.info("Exiting due to explain"); return 0; } Hive db = getHive(); http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 22843c9..76e399e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -278,6 +278,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { // Multi-file load is for dynamic partitions when some partitions do not // need to merge and they can simply be moved to the target directory. + // This is also used for MM table conversion. LoadMultiFilesDesc lmfd = work.getLoadMultiFilesWork(); if (lmfd != null) { boolean isDfsDir = lmfd.getIsDfsDir(); http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java index 9e528b5..ad5728f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java @@ -172,7 +172,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable { // work.getLoadTableDesc().getReplace() is true means insert overwrite command // work.getLoadFileDesc().getDestinationCreateTable().isEmpty() means CTAS etc. // acidTable will not have accurate stats unless it is set through analyze command. - if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) { + if (work.getTableSpecs() == null && AcidUtils.isFullAcidTable(table)) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); } else if (work.getTableSpecs() != null || (work.getLoadTableDesc() != null && work.getLoadTableDesc().getReplace()) @@ -222,7 +222,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable { // org.apache.hadoop.hive.metastore.api.Partition tPart = partn.getTPartition(); Map<String, String> parameters = tPart.getParameters(); - if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) { + if (work.getTableSpecs() == null && AcidUtils.isFullAcidTable(table)) { StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.FALSE); } else if (work.getTableSpecs() != null || (work.getLoadTableDesc() != null && work.getLoadTableDesc().getReplace()) http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java index 9e18638..1c18975 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java @@ -212,7 +212,8 @@ public class WriteEntity extends Entity implements Serializable { case ADDPARTITION: case ADDSERDEPROPS: - case ADDPROPS: return WriteType.DDL_SHARED; + case ADDPROPS: + return WriteType.DDL_SHARED; case COMPACT: case TOUCH: return WriteType.DDL_NO_LOCK; http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 2dfbc8d..26c65f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -1108,6 +1108,10 @@ public class AcidUtils { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + public static boolean isFullAcidTable(Table table) { + return isAcidTable(table) && !MetaStoreUtils.isInsertOnlyTable(table.getParameters()); + } + /** * Sets the acidOperationalProperties in the configuration object argument. * @param conf Mutable configuration object http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index da7505b..40d055a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -230,7 +230,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { // This is a file or something we don't hold locks for. continue; } - if(t != null && AcidUtils.isAcidTable(t)) { + if(t != null && AcidUtils.isFullAcidTable(t)) { compBuilder.setIsAcid(true); } LockComponent comp = compBuilder.build(); @@ -261,7 +261,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { case INSERT: t = getTable(output); - if(AcidUtils.isAcidTable(t)) { + if(AcidUtils.isFullAcidTable(t)) { compBuilder.setShared(); compBuilder.setIsAcid(true); } @@ -318,7 +318,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { // This is a file or something we don't hold locks for. continue; } - if(t != null && AcidUtils.isAcidTable(t)) { + if(t != null && AcidUtils.isFullAcidTable(t)) { compBuilder.setIsAcid(true); } LockComponent comp = compBuilder.build(); http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 4a8df95..a1ec7e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2508,7 +2508,6 @@ private void constructOneLBLocationMap(FileStatus fSta, * @param tbl * object for which partition is needed * @return list of partition objects - * @throws HiveException */ public List<Partition> getPartitions(Table tbl) throws HiveException { if (tbl.isPartitioned()) { @@ -3143,13 +3142,14 @@ private void constructOneLBLocationMap(FileStatus fSta, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); HdfsUtils.HadoopFileStatus destStatus = null; - // If source path is a subdirectory of the destination path: + // If source path is a subdirectory of the destination path (or the other way around): // ex: INSERT OVERWRITE DIRECTORY 'target/warehouse/dest4.out' SELECT src.value WHERE src.key >= 300; // where the staging directory is a subdirectory of the destination directory // (1) Do not delete the dest dir before doing the move operation. // (2) It is assumed that subdir and dir are in same encryption zone. // (3) Move individual files from scr dir to dest dir. - boolean destIsSubDir = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal); + boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal), + destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false); try { if (inheritPerms || replace) { try{ @@ -3159,7 +3159,7 @@ private void constructOneLBLocationMap(FileStatus fSta, //if replace is false, rename (mv) actually move the src under dest dir //if destf is an existing file, rename is actually a replace, and do not need // to delete the file first - if (replace && !destIsSubDir) { + if (replace && !srcIsSubDirOfDest) { destFs.delete(destf, true); LOG.debug("The path " + destf.toString() + " is deleted"); } @@ -3192,13 +3192,17 @@ private void constructOneLBLocationMap(FileStatus fSta, replace, // overwrite destination conf); } else { - if (destIsSubDir) { + if (srcIsSubDirOfDest || destIsSubDirOfSrc) { FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); List<Future<Void>> futures = new LinkedList<>(); final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; + if (destIsSubDirOfSrc && !destFs.exists(destf)) { + Utilities.LOG14535.info("Creating " + destf); + destFs.mkdirs(destf); + } /* Move files one by one because source is a subdirectory of destination */ for (final FileStatus srcStatus : srcs) { http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index ea90889..d3466fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -986,4 +986,7 @@ public class Table implements Serializable { return deserializer != null; } + public void setMmNextWriteId(long writeId) { + this.tTable.setMmNextWriteId(writeId); + } }; http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java index da261bb..25836c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java @@ -409,7 +409,7 @@ public class BucketingSortingReduceSinkOptimizer extends Transform { if(stack.get(0) instanceof TableScanOperator) { TableScanOperator tso = ((TableScanOperator)stack.get(0)); - if(AcidUtils.isAcidTable(tso.getConf().getTableMetadata())) { + if(AcidUtils.isFullAcidTable(tso.getConf().getTableMetadata())) { /*ACID tables have complex directory layout and require merging of delta files * on read thus we should not try to read bucket files directly*/ return null; http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java index 17510e9..5a9c72f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java @@ -319,7 +319,7 @@ public class StatsOptimizer extends Transform { } Table tbl = tsOp.getConf().getTableMetadata(); - if (AcidUtils.isAcidTable(tbl)) { + if (AcidUtils.isFullAcidTable(tbl)) { Logger.info("Table " + tbl.getTableName() + " is ACID table. Skip StatsOptimizer."); return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 3e016f3..e6a31e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.SQLForeignKey; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SkewedInfo; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; @@ -72,7 +73,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.PKInfo; import org.apache.hadoop.hive.ql.parse.authorization.AuthorizationParseUtils; import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactory; import org.apache.hadoop.hive.ql.parse.authorization.HiveAuthorizationTaskFactoryImpl; @@ -184,6 +184,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { private final Set<String> reservedPartitionValues; private final HiveAuthorizationTaskFactory hiveAuthorizationTaskFactory; + private WriteEntity alterTableOutput; static { TokenToTypeName.put(HiveParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME); @@ -674,6 +675,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { } private void analyzeShowRoles(ASTNode ast) throws SemanticException { + @SuppressWarnings("unchecked") Task<DDLWork> roleDDLTask = (Task<DDLWork>) hiveAuthorizationTaskFactory .createShowRolesTask(ast, ctx.getResFile(), getInputs(), getOutputs()); @@ -1413,10 +1415,11 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { alterTblDesc.setEnvironmentContext(environmentContext); alterTblDesc.setOldName(tableName); - addInputsOutputsAlterTable(tableName, partSpec, alterTblDesc); + boolean isPotentialMmSwitch = mapProp.containsKey(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL) + || mapProp.containsKey(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES); + addInputsOutputsAlterTable(tableName, partSpec, alterTblDesc, isPotentialMmSwitch); - rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), - alterTblDesc), conf)); + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc), conf)); } private void analyzeAlterTableSerdeProps(ASTNode ast, String tableName, @@ -1476,16 +1479,21 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { private void addInputsOutputsAlterTable(String tableName, Map<String, String> partSpec, AlterTableTypes op) throws SemanticException { - addInputsOutputsAlterTable(tableName, partSpec, null, op); + addInputsOutputsAlterTable(tableName, partSpec, null, op, false); + } + + private void addInputsOutputsAlterTable(String tableName, Map<String, String> partSpec, + AlterTableDesc desc, boolean doForceExclusive) throws SemanticException { + addInputsOutputsAlterTable(tableName, partSpec, desc, desc.getOp(), doForceExclusive); } private void addInputsOutputsAlterTable(String tableName, Map<String, String> partSpec, AlterTableDesc desc) throws SemanticException { - addInputsOutputsAlterTable(tableName, partSpec, desc, desc.getOp()); + addInputsOutputsAlterTable(tableName, partSpec, desc, desc.getOp(), false); } private void addInputsOutputsAlterTable(String tableName, Map<String, String> partSpec, - AlterTableDesc desc, AlterTableTypes op) throws SemanticException { + AlterTableDesc desc, AlterTableTypes op, boolean doForceExclusive) throws SemanticException { boolean isCascade = desc != null && desc.getIsCascade(); boolean alterPartitions = partSpec != null && !partSpec.isEmpty(); //cascade only occurs at table level then cascade to partition level @@ -1496,11 +1504,13 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { Table tab = getTable(tableName, true); // Determine the lock type to acquire - WriteEntity.WriteType writeType = WriteEntity.determineAlterTableWriteType(op); + WriteEntity.WriteType writeType = doForceExclusive + ? WriteType.DDL_EXCLUSIVE : WriteEntity.determineAlterTableWriteType(op); if (!alterPartitions) { inputs.add(new ReadEntity(tab)); - outputs.add(new WriteEntity(tab, writeType)); + alterTableOutput = new WriteEntity(tab, writeType); + outputs.add(alterTableOutput); //do not need the lock for partitions since they are covered by the table lock if (isCascade) { for (Partition part : getPartitions(tab, partSpec, false)) { http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 35cfcd9..e8b2b84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -1946,7 +1946,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } // Disallow INSERT INTO on bucketized tables - boolean isAcid = AcidUtils.isAcidTable(tab); + boolean isAcid = AcidUtils.isFullAcidTable(tab); boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName()); if (isTableWrittenTo && tab.getNumBuckets() > 0 && !isAcid) { @@ -6471,7 +6471,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { nullOrder.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? 'a' : 'z'); } input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(), - maxReducers, (AcidUtils.isAcidTable(dest_tab) ? + maxReducers, (AcidUtils.isFullAcidTable(dest_tab) ? getAcidType(dest_tab, table_desc.getOutputFileFormatClass()) : AcidUtils.Operation.NOT_ACID)); reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0)); ctx.setMultiFileSpray(multiFileSpray); @@ -6557,7 +6557,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { case QBMetaData.DEST_TABLE: { dest_tab = qbm.getDestTableForAlias(dest); - destTableIsAcid = AcidUtils.isAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab); destTableIsTemporary = dest_tab.isTemporary(); // Is the user trying to insert into a external tables @@ -6633,7 +6633,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { dest_part = qbm.getDestPartitionForAlias(dest); dest_tab = dest_part.getTable(); - destTableIsAcid = AcidUtils.isAcidTable(dest_tab); + destTableIsAcid = AcidUtils.isFullAcidTable(dest_tab); checkExternalTable(dest_tab); @@ -6702,7 +6702,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { field_schemas = new ArrayList<FieldSchema>(); destTableIsTemporary = tblDesc.isTemporary(); destTableIsMaterialization = tblDesc.isMaterialization(); - if (MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps())) { + if (!destTableIsTemporary && MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps())) { isMmTable = isMmCtas = true; // TODO# this should really get current ACID txn; assuming ACID works correctly the txn // should have been opened to create the ACID table. For now use the first ID. @@ -11429,7 +11429,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (p != null) { tbl = p.getTable(); } - if (tbl != null && AcidUtils.isAcidTable(tbl)) { + if (tbl != null && AcidUtils.isFullAcidTable(tbl)) { acidInQuery = true; checkAcidTxnManager(tbl); } @@ -11492,7 +11492,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { tbl = writeEntity.getTable(); } - if (tbl != null && AcidUtils.isAcidTable(tbl)) { + if (tbl != null && AcidUtils.isFullAcidTable(tbl)) { acidInQuery = true; checkAcidTxnManager(tbl); } http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index ebe613e..65b6813 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -128,7 +128,7 @@ public class TableScanDesc extends AbstractOperatorDesc { this.alias = alias; this.virtualCols = vcs; this.tableMetadata = tblMetadata; - isAcidTable = AcidUtils.isAcidTable(this.tableMetadata); + isAcidTable = AcidUtils.isFullAcidTable(this.tableMetadata); if (isAcidTable) { acidOperationalProperties = AcidUtils.getAcidOperationalProperties(this.tableMetadata); } http://git-wip-us.apache.org/repos/asf/hive/blob/e083d33a/ql/src/test/queries/clientpositive/mm_conversions.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_conversions.q b/ql/src/test/queries/clientpositive/mm_conversions.q new file mode 100644 index 0000000..4fcaf73 --- /dev/null +++ b/ql/src/test/queries/clientpositive/mm_conversions.q @@ -0,0 +1,85 @@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.fetch.task.conversion=none; +set tez.grouping.min-size=1; +set tez.grouping.max-size=2; +set hive.exec.dynamic.partition.mode=nonstrict; + + +-- Force multiple writers when reading +drop table intermediate; +create table intermediate(key int) partitioned by (p int) stored as orc; +insert into table intermediate partition(p='455') select distinct key from src where key >= 0 order by key desc limit 1; +insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 1; +insert into table intermediate partition(p='457') select distinct key from src where key >= 100 order by key asc limit 1; + +drop table simple_from_mm; +create table simple_from_mm(key int) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +insert into table simple_from_mm select key from intermediate; +insert into table simple_from_mm select key from intermediate; +select * from simple_from_mm; +alter table simple_from_mm unset tblproperties('transactional_properties', 'transactional'); +select * from simple_from_mm; +insert into table simple_from_mm select key from intermediate; +select * from simple_from_mm; +alter table simple_from_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only"); +select * from simple_from_mm; +insert into table simple_from_mm select key from intermediate; +select * from simple_from_mm; +alter table simple_from_mm set tblproperties("transactional"="false", 'transactional_properties'='false'); +select * from simple_from_mm; +insert into table simple_from_mm select key from intermediate; +select * from simple_from_mm; +drop table simple_from_mm; + +drop table simple_to_mm; +create table simple_to_mm(key int) stored as orc; +insert into table simple_to_mm select key from intermediate; +insert into table simple_to_mm select key from intermediate; +select * from simple_to_mm; +alter table simple_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only"); +select * from simple_to_mm; +insert into table simple_to_mm select key from intermediate; +insert into table simple_to_mm select key from intermediate; +select * from simple_to_mm; +drop table simple_to_mm; + +drop table part_from_mm; +create table part_from_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +insert into table part_from_mm partition(key_mm='455') select key from intermediate; +insert into table part_from_mm partition(key_mm='455') select key from intermediate; +insert into table part_from_mm partition(key_mm='456') select key from intermediate; +select * from part_from_mm; +alter table part_from_mm unset tblproperties('transactional_properties', 'transactional'); +select * from part_from_mm; +insert into table part_from_mm partition(key_mm='456') select key from intermediate; +insert into table part_from_mm partition(key_mm='457') select key from intermediate; +select * from part_from_mm; +alter table part_from_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only"); +select * from part_from_mm; +insert into table part_from_mm partition(key_mm='456') select key from intermediate; +insert into table part_from_mm partition(key_mm='455') select key from intermediate; +select * from part_from_mm; +alter table part_from_mm set tblproperties("transactional"="false", 'transactional_properties'='false'); +select * from part_from_mm; +insert into table part_from_mm partition(key_mm='457') select key from intermediate; +select * from part_from_mm; +drop table part_from_mm; + +drop table part_to_mm; +create table part_to_mm(key int) partitioned by (key_mm int) stored as orc; +insert into table part_to_mm partition(key_mm='455') select key from intermediate; +insert into table part_to_mm partition(key_mm='456') select key from intermediate; +select * from part_to_mm; +alter table part_to_mm set tblproperties("transactional"="true", "transactional_properties"="insert_only"); +select * from part_to_mm; +insert into table part_to_mm partition(key_mm='456') select key from intermediate; +insert into table part_to_mm partition(key_mm='457') select key from intermediate; +select * from part_to_mm; +drop table part_to_mm; + + + + + +drop table intermediate;
