http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 0000000,a1187c4..6354f77 mode 000000,100644..100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@@ -1,0 -1,243 +1,245 @@@ + /* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; + + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hive.metastore.MetaStoreUtils; + import org.apache.hadoop.hive.metastore.TableType; + import org.apache.hadoop.hive.metastore.api.Database; + import org.apache.hadoop.hive.metastore.api.MetaException; + import org.apache.hadoop.hive.ql.ErrorMsg; + import org.apache.hadoop.hive.ql.exec.ReplCopyTask; + import org.apache.hadoop.hive.ql.exec.Task; + import org.apache.hadoop.hive.ql.exec.TaskFactory; + import org.apache.hadoop.hive.ql.exec.Utilities; + import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; + import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; + import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; + import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; + import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; + import org.apache.hadoop.hive.ql.metadata.Table; + import org.apache.hadoop.hive.ql.parse.EximUtil; + import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; + import org.apache.hadoop.hive.ql.parse.ReplicationSpec; + import org.apache.hadoop.hive.ql.parse.SemanticException; + import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; + import org.apache.hadoop.hive.ql.plan.ImportTableDesc; + import org.apache.hadoop.hive.ql.plan.LoadTableDesc; + import org.apache.hadoop.hive.ql.plan.MoveWork; ++import org.apache.hadoop.hive.ql.session.SessionState; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.io.IOException; + import java.io.Serializable; + import java.util.ArrayList; + import java.util.HashSet; + import java.util.List; + import java.util.TreeMap; + + import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; + + public class LoadTable { + private final static Logger LOG = LoggerFactory.getLogger(LoadTable.class); + // private final Helper helper; + private final Context context; + private final ReplLogger replLogger; + private final TableContext tableContext; + private final TaskTracker tracker; + private final TableEvent event; + + public LoadTable(TableEvent event, Context context, ReplLogger replLogger, + TableContext tableContext, TaskTracker limiter) + throws SemanticException, IOException { + this.event = event; + this.context = context; + this.replLogger = replLogger; + this.tableContext = tableContext; + this.tracker = new TaskTracker(limiter); + } + + private void createTableReplLogTask(String tableName, TableType tableType) throws SemanticException { + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,tableName, tableType); + Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, context.hiveConf); + ReplLoadTask.dependency(tracker.tasks(), replLogTask); + + if (tracker.tasks().isEmpty()) { + tracker.addTask(replLogTask); + } else { + ReplLoadTask.dependency(tracker.tasks(), replLogTask); + + List<Task<? extends Serializable>> visited = new ArrayList<>(); + tracker.updateTaskCount(replLogTask, visited); + } + } + + public TaskTracker tasks() throws SemanticException { + // Path being passed to us is a table dump location. We go ahead and load it in as needed. + // If tblName is null, then we default to the table name specified in _metadata, which is good. + // or are both specified, in which case, that's what we are intended to create the new table as. + try { + if (event.shouldNotReplicate()) { + return tracker; + } + String dbName = tableContext.dbNameToLoadIn; //this can never be null or empty; + // Create table associated with the import + // Executed if relevant, and used to contain all the other details about the table if not. + ImportTableDesc tableDesc = tableContext.overrideProperties(event.tableDesc(dbName)); + Table table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb); + ReplicationSpec replicationSpec = event.replicationSpec(); + + // Normally, on import, trying to create a table or a partition in a db that does not yet exist + // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying + // to create tasks to create a table inside a db that as-of-now does not exist, but there is + // a precursor Task waiting that will create it before this is encountered. Thus, we instantiate + // defaults and do not error out in that case. + // the above will change now since we are going to split replication load in multiple execution + // tasks and hence we could have created the database earlier in which case the waitOnPrecursor will + // be false and hence if db Not found we should error out. + Database parentDb = context.hiveDb.getDatabase(tableDesc.getDatabaseName()); + if (parentDb == null) { + if (!tableContext.waitOnPrecursor()) { + throw new SemanticException( + ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tableDesc.getDatabaseName())); + } + } + + if (table == null) { + // If table doesn't exist, allow creating a new one only if the database state is older than the update. + if ((parentDb != null) && (!replicationSpec + .allowReplacementInto(parentDb.getParameters()))) { + // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + return tracker; + } + } else { + if (!replicationSpec.allowReplacementInto(table.getParameters())) { + // If the target table exists and is newer or same as current update based on repl.last.id, then just noop it. + return tracker; + } + } + + if (tableDesc.getLocation() == null) { + tableDesc.setLocation(location(tableDesc, parentDb)); + } + + + /* Note: In the following section, Metadata-only import handling logic is + interleaved with regular repl-import logic. The rule of thumb being + followed here is that MD-only imports are essentially ALTERs. They do + not load data, and should not be "creating" any metadata - they should + be replacing instead. The only place it makes sense for a MD-only import + to create is in the case of a table that's been dropped and recreated, + or in the case of an unpartitioned table. In all other cases, it should + behave like a noop or a pure MD alter. + */ + if (table == null) { + newTableTasks(tableDesc); + } else { + existingTableTasks(tableDesc, table, replicationSpec); + } + + if (!isPartitioned(tableDesc)) { + createTableReplLogTask(tableDesc.getTableName(), tableDesc.tableType()); + } + return tracker; + } catch (Exception e) { + throw new SemanticException(e); + } + } + + private void existingTableTasks(ImportTableDesc tblDesc, Table table, + ReplicationSpec replicationSpec) { + if (!table.isPartitioned()) { + + LOG.debug("table non-partitioned"); + if (!replicationSpec.allowReplacementInto(table.getParameters())) { + return; // silently return, table is newer than our replacement. + } + + Task<? extends Serializable> alterTableTask = alterTableTask(tblDesc, replicationSpec); + if (replicationSpec.isMetadataOnly()) { + tracker.addTask(alterTableTask); + } else { + Task<?> loadTableTask = + loadTableTask(table, replicationSpec, event.metadataPath(), event.metadataPath()); + alterTableTask.addDependentTask(loadTableTask); + tracker.addTask(alterTableTask); + } + } + } + + private void newTableTasks(ImportTableDesc tblDesc) throws SemanticException { + Table table; + table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); + // Either we're dropping and re-creating, or the table didn't exist, and we're creating. + Task<?> createTableTask = + tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); + if (event.replicationSpec().isMetadataOnly()) { + tracker.addTask(createTableTask); + return; + } + if (!isPartitioned(tblDesc)) { + LOG.debug("adding dependent CopyWork/MoveWork for table"); + Task<?> loadTableTask = + loadTableTask(table, event.replicationSpec(), new Path(tblDesc.getLocation()), + event.metadataPath()); + createTableTask.addDependentTask(loadTableTask); + } + tracker.addTask(createTableTask); + } + + private String location(ImportTableDesc tblDesc, Database parentDb) + throws MetaException, SemanticException { + if (!tableContext.waitOnPrecursor()) { + return context.warehouse.getDefaultTablePath(parentDb, tblDesc.getTableName()).toString(); + } else { + Path tablePath = new Path( + context.warehouse.getDefaultDatabasePath(tblDesc.getDatabaseName()), + MetaStoreUtils.encodeTableName(tblDesc.getTableName().toLowerCase()) + ); + return context.warehouse.getDnsPath(tablePath).toString(); + } + } + + private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath, + Path fromURI) { + Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME); + Path tmpPath = context.utils.getExternalTmpPath(tgtPath); + Task<?> copyTask = + ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); + + LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace() ++ tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace(), ++ SessionState.get().getTxnMgr().getCurrentTxnId() + ); + MoveWork moveWork = + new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf); + copyTask.addDependentTask(loadTableTask); + return copyTask; + } + + private Task<? extends Serializable> alterTableTask(ImportTableDesc tableDesc, + ReplicationSpec replicationSpec) { + tableDesc.setReplaceMode(true); + if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { + tableDesc.setReplicationSpec(replicationSpec); + } + return tableDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); + } + }
http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index cfa4e2c,90d1372..b5733ec --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@@ -145,12 -162,18 +162,18 @@@ public class VectorizedRowBatchCtx public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, Configuration hiveConf, FileSplit split, Object[] partitionValues) throws IOException { + // TODO: this is invalid for SMB. Keep this for now for legacy reasons. See the other overload. + MapWork mapWork = Utilities.getMapWork(hiveConf); + getPartitionValues(vrbCtx, mapWork, split, partitionValues); + } - Map<Path, PartitionDesc> pathToPartitionInfo = Utilities - .getMapWork(hiveConf).getPathToPartitionInfo(); + public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, + MapWork mapWork, FileSplit split, Object[] partitionValues) + throws IOException { + Map<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo(); PartitionDesc partDesc = HiveFileFormatUtils - .getPartitionDescFromPathRecursively(pathToPartitionInfo, + .getFromPathRecursively(pathToPartitionInfo, split.getPath(), IOPrepareCache.get().getPartitionDescMap()); getPartitionValues(vrbCtx, partDesc, partitionValues); http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index f325c0e,feacdd8..9c9a2d3 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@@ -36,9 -37,9 +37,10 @@@ import org.apache.hadoop.hive.conf.Hive import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.ql.ErrorMsg; + import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; @@@ -345,11 -384,7 +385,10 @@@ public class AcidUtils public static final String SPLIT_UPDATE_STRING = "split_update"; public static final int HASH_BASED_MERGE_BIT = 0x02; public static final String HASH_BASED_MERGE_STRING = "hash_merge"; + public static final int INSERT_ONLY_BIT = 0x04; + public static final String INSERT_ONLY_STRING = "insert_only"; public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY; - public static final String LEGACY_VALUE_STRING = TransactionalValidationListener.LEGACY_TRANSACTIONAL_PROPERTY; + public static final String INSERTONLY_VALUE_STRING = TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY; private AcidOperationalProperties() { } @@@ -401,12 -414,6 +430,9 @@@ if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) { return AcidOperationalProperties.getDefault(); } - if (propertiesStr.equalsIgnoreCase(LEGACY_VALUE_STRING)) { - return AcidOperationalProperties.getLegacy(); - } + if (propertiesStr.equalsIgnoreCase(INSERTONLY_VALUE_STRING)) { + return AcidOperationalProperties.getInsertOnly(); + } AcidOperationalProperties obj = new AcidOperationalProperties(); String[] options = propertiesStr.split("\\|"); for (String option : options) { http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index a70fde6,38aaeed..6178de2 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@@ -14,16 -14,12 +14,20 @@@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hive.common.io.DataCache; + import org.apache.hadoop.hive.common.io.FileMetadataCache; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; + import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.Utilities; http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 01e8a48,5dec791..82804b2 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@@ -423,11 -423,19 +423,19 @@@ public final class DbTxnManager extend compBuilder.setExclusive(); compBuilder.setOperationType(DataOperationType.NO_TXN); break; - + case INSERT_OVERWRITE: + t = getTable(output); + if (AcidUtils.isAcidTable(t)) { + compBuilder.setSemiShared(); + compBuilder.setOperationType(DataOperationType.UPDATE); + } else { + compBuilder.setExclusive(); + compBuilder.setOperationType(DataOperationType.NO_TXN); + } + break; case INSERT: assert t != null; - if(AcidUtils.isAcidTable(t)) { + if(AcidUtils.isFullAcidTable(t)) { compBuilder.setShared(); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index ae451e2,d661f10..a0b735c --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@@ -65,9 -62,9 +65,10 @@@ import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; + import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; +import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.classification.InterfaceAudience.LimitedPrivate; @@@ -3338,8 -3250,9 +3389,9 @@@ private void constructOneLBLocationMap( // (1) Do not delete the dest dir before doing the move operation. // (2) It is assumed that subdir and dir are in same encryption zone. // (3) Move individual files from scr dir to dest dir. - boolean destIsSubDir = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal); + boolean srcIsSubDirOfDest = isSubDir(srcf, destf, srcFs, destFs, isSrcLocal), + destIsSubDirOfSrc = isSubDir(destf, srcf, destFs, srcFs, false); + final String msg = "Unable to move source " + srcf + " to destination " + destf; - try { if (replace) { try{ http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 7fd8f04,a054abb..021507f --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@@ -1507,9 -1517,14 +1530,15 @@@ public class DDLSemanticAnalyzer extend } Table tab = getTable(tableName, true); + // cascade only occurs with partitioned table + if (isCascade && !tab.isPartitioned()) { + throw new SemanticException( + ErrorMsg.ALTER_TABLE_NON_PARTITIONED_TABLE_CASCADE_NOT_SUPPORTED); + } + // Determine the lock type to acquire - WriteEntity.WriteType writeType = WriteEntity.determineAlterTableWriteType(op); + WriteEntity.WriteType writeType = doForceExclusive + ? WriteType.DDL_EXCLUSIVE : WriteEntity.determineAlterTableWriteType(op); if (!alterPartitions) { inputs.add(new ReadEntity(tab)); http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 2401036,40c34bf..4109de4 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@@ -140,15 -141,15 +141,16 @@@ public class EximUtil * Initialize the URI where the exported data collection is * to created for export, or is present for import */ - static URI getValidatedURI(HiveConf conf, String dcPath) throws SemanticException { + public static URI getValidatedURI(HiveConf conf, String dcPath) throws SemanticException { try { - boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); + boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE) + || conf.getBoolVar(HiveConf.ConfVars.HIVEEXIMTESTMODE); URI uri = new Path(dcPath).toUri(); - String scheme = uri.getScheme(); + FileSystem fs = FileSystem.get(uri, conf); + // Get scheme from FileSystem + String scheme = fs.getScheme(); String authority = uri.getAuthority(); String path = uri.getPath(); - FileSystem fs = FileSystem.get(uri, conf); LOG.info("Path before norm :" + path); // generate absolute path relative to home directory http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index e534272,b8c6ea9..54ee7ae --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@@ -18,42 -18,12 +18,46 @@@ package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.util.HashSet; +import java.util.List; + +import org.antlr.runtime.tree.Tree; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; + import org.antlr.runtime.tree.Tree; + import org.apache.hadoop.hive.ql.ErrorMsg; + import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.CopyWork; +import org.slf4j.Logger; + import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; /** * ExportSemanticAnalyzer. http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 5ac15af,7f3460f..387582b --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@@ -62,12 -59,10 +62,12 @@@ import org.apache.hadoop.hive.ql.metada import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; + import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; +import org.apache.hadoop.hive.ql.plan.CopyWork; +import org.apache.hadoop.hive.ql.plan.CreateTableDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; - import org.apache.hadoop.hive.ql.plan.DropTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.session.SessionState; @@@ -223,9 -228,8 +234,9 @@@ public class ImportSemanticAnalyzer ext } catch (Exception e) { throw new HiveException(e); } + boolean isSourceMm = MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps()); - if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){ + if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ tblDesc.setReplicationSpec(replicationSpec); } @@@ -318,8 -306,8 +324,8 @@@ } else { createReplImportTasks( tblDesc, partitionDescs, - isPartSpecSet, replicationSpec, waitOnPrecursor, table, - fromURI, fs, wh, x, txnId, stmtId, isSourceMm); + replicationSpec, waitOnPrecursor, table, - fromURI, fs, wh, x, updatedMetadata); ++ fromURI, fs, wh, x, txnId, stmtId, isSourceMm, updatedMetadata); } return tableExists; } @@@ -800,8 -734,8 +799,8 @@@ // ensure if destination is not empty only for regular import Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); - checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x); + checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG()); - loadTable(fromURI, table, false, tgtPath, replicationSpec,x); + loadTable(fromURI, table, false, tgtPath, replicationSpec, x, txnId, stmtId, isSourceMm); } // Set this to read because we can't overwrite any existing partitions x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@@ -838,14 -769,8 +837,14 @@@ tablePath = wh.getDefaultTablePath(parentDb, tblDesc.getTableName()); } FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf()); - checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x); + checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG()); - t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x)); + if (isSourceMm) { // since target table doesn't exist, it should inherit soruce table's properties + Map<String, String> tblproperties = table.getParameters(); + tblproperties.put("transactional", "true"); + tblproperties.put("transactional_properties", "insert_only"); + table.setParameters(tblproperties); + } + t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, txnId, stmtId, isSourceMm)); } } x.getTasks().add(t); @@@ -866,29 -783,14 +865,15 @@@ private static void createReplImportTasks( ImportTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, - boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnPrecursor, + ReplicationSpec replicationSpec, boolean waitOnPrecursor, Table table, URI fromURI, FileSystem fs, Warehouse wh, - EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm) - EximUtil.SemanticAnalyzerWrapperContext x, ++ EximUtil.SemanticAnalyzerWrapperContext x, Long txnId, int stmtId, boolean isSourceMm, + UpdatedMetaDataTracker updatedMetadata) throws HiveException, URISyntaxException, IOException, MetaException { + Task<?> dr = null; WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; - if ((table != null) && (isPartitioned(tblDesc) != table.isPartitioned())){ - // If destination table exists, but is partitioned, and we think we're writing to an unpartitioned - // or if destination table exists, but is unpartitioned and we think we're writing to a partitioned - // table, then this can only happen because there are drops in the queue that are yet to be processed. - // So, we check the repl.last.id of the destination, and if it's newer, we no-op. If it's older, we - // drop and re-create. - if (replicationSpec.allowReplacementInto(table)){ - dr = dropTableTask(table, x); - lockType = WriteEntity.WriteType.DDL_EXCLUSIVE; - table = null; // null it out so we go into the table re-create flow. - } else { - return; // noop out of here. - } - } - // Normally, on import, trying to create a table or a partition in a db that does not yet exist // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying // to create tasks to create a table inside a db that as-of-now does not exist, but there is @@@ -942,21 -863,18 +949,18 @@@ for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); + addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict)); + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x)); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, txnId, stmtId, isSourceMm)); } } - if (dr == null){ - // Simply create - x.getTasks().add(t); - } else { - // Drop and recreate - dr.addDependentTask(t); - x.getTasks().add(dr); - } + // Simply create + x.getTasks().add(t); } else { // Table existed, and is okay to replicate into, not dropping and re-creating. if (table.isPartitioned()) { @@@ -971,15 -887,18 +975,18 @@@ if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict)); + if (updatedMetadata != null) { + updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); + } } } else { // If replicating, then the partition already existing means we need to replace, maybe, if // the destination ptn's repl.last.id is older than the replacement's. - if (replicationSpec.allowReplacementInto(ptn)){ + if (replicationSpec.allowReplacementInto(ptn.getParameters())){ if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, txnId, stmtId, isSourceMm, ict)); } else { x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); @@@ -1002,13 -921,9 +1009,10 @@@ } } else { x.getLOG().debug("table non-partitioned"); - if (!replicationSpec.allowReplacementInto(table)){ - return; // silently return, table is newer than our replacement. - } if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into - loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), replicationSpec, x); + loadTable(fromURI, table, replicationSpec.isReplace(), new Path(fromURI), + replicationSpec, x, txnId, stmtId, isSourceMm); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index 9b39cd0,fa79700..5e70863 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@@ -227,6 -228,9 +229,9 @@@ public class LoadSemanticAnalyzer exten + " and use 'insert... select' to allow Hive to enforce bucketing. " + error); } - if(AcidUtils.isAcidTable(ts.tableHandle)) { ++ if(AcidUtils.isAcidTable(ts.tableHandle) && !MetaStoreUtils.isInsertOnlyTable(ts.tableHandle.getParameters())) { + throw new SemanticException(ErrorMsg.LOAD_DATA_ON_ACID_TABLE, ts.tableHandle.getCompleteName()); + } // make sure the arguments make sense List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal); http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 19882eb,6f379da..ab3cbe1 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@@ -6851,17 -6896,13 +6855,21 @@@ public class SemanticAnalyzer extends B AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsAcid) { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); - checkAcidConstraints(qb, table_desc, dest_tab, acidOp); + checkAcidConstraints(qb, table_desc, dest_tab); } - ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp); + if (MetaStoreUtils.isInsertOnlyTable(table_desc.getProperties())) { + acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); + } + if (isMmTable) { + txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); + } + boolean isReplace = !qb.getParseInfo().isInsertIntoTable( + dest_tab.getDbName(), dest_tab.getTableName()); + ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, txnId); + // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old + // deltas and base and leave them up to the cleaner to clean up + ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), + dest_tab.getTableName()) && !destTableIsAcid); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); } else { @@@ -6915,17 -7011,13 +6923,19 @@@ AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsAcid) { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); - checkAcidConstraints(qb, table_desc, dest_tab, acidOp); + checkAcidConstraints(qb, table_desc, dest_tab); } - ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp); + if (MetaStoreUtils.isInsertOnlyTable(dest_part.getTable().getParameters())) { + acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); + } + if (isMmTable) { + txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); + } + ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, txnId); + // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old + // deltas and base and leave them up to the cleaner to clean up ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), - dest_tab.getTableName())); + dest_tab.getTableName()) && !destTableIsAcid); ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); @@@ -7091,122 -7226,18 +7101,122 @@@ genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx); } - FileSinkDesc fileSinkDesc = new FileSinkDesc( - queryTmpdir, - table_desc, - conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), - currentTableId, - rsCtx.isMultiFileSpray(), - canBeMerged, - rsCtx.getNumFiles(), - rsCtx.getTotalFiles(), - rsCtx.getPartnCols(), - dpCtx, - dest_path); + FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part, + dest_path, currentTableId, destTableIsAcid, destTableIsTemporary, + destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, - canBeMerged, txnId, isMmCtas); ++ canBeMerged, dest_tab, txnId, isMmCtas); + if (isMmCtas) { + // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. + tableDesc.setWriter(fileSinkDesc); + } + + if (SessionState.get().isHiveServerQuery() && + null != table_desc && + table_desc.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) && + HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) { + fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(true); + } else { + fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(false); + } + + Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( + fileSinkDesc, fsRS, input), inputRR); + + handleLineage(ltd, output); + + if (LOG.isDebugEnabled()) { + LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " + + dest_path + " row schema: " + inputRR.toString()); + } + + FileSinkOperator fso = (FileSinkOperator) output; + fso.getConf().setTable(dest_tab); + // the following code is used to collect column stats when + // hive.stats.autogather=true + // and it is an insert overwrite or insert into table + if (dest_tab != null && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) + && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER) + && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) { + if (dest_type.intValue() == QBMetaData.DEST_TABLE) { + genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo() + .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); + } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) { + genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb + .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); + + } + } + return output; + } + + private ColsAndTypes deriveFileSinkColTypes( + RowResolver inputRR, List<FieldSchema> field_schemas) throws SemanticException { + ColsAndTypes result = new ColsAndTypes("", ""); + ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos(); + boolean first = true; + for (ColumnInfo colInfo : colInfos) { + String[] nm = inputRR.reverseLookup(colInfo.getInternalName()); + + if (nm[1] != null) { // non-null column alias + colInfo.setAlias(nm[1]); + } + + String colName = colInfo.getInternalName(); //default column name + if (field_schemas != null) { + FieldSchema col = new FieldSchema(); + if (!("".equals(nm[0])) && nm[1] != null) { + colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // remove `` + } + colName = fixCtasColumnName(colName); + col.setName(colName); + String typeName = colInfo.getType().getTypeName(); + // CTAS should NOT create a VOID type + if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) { + throw new SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE.getMsg(colName)); + } + col.setType(typeName); + field_schemas.add(col); + } + + if (!first) { + result.cols = result.cols.concat(","); + result.colTypes = result.colTypes.concat(":"); + } + + first = false; + result.cols = result.cols.concat(colName); + + // Replace VOID type with string when the output is a temp table or + // local files. + // A VOID type can be generated under the query: + // + // select NULL from tt; + // or + // insert overwrite local directory "abc" select NULL from tt; + // + // where there is no column type to which the NULL value should be + // converted. + // + String tName = colInfo.getType().getTypeName(); + if (tName.equals(serdeConstants.VOID_TYPE_NAME)) { + result.colTypes = result.colTypes.concat(serdeConstants.STRING_TYPE_NAME); + } else { + result.colTypes = result.colTypes.concat(tName); + } + } + return result; + } + + private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc, + Partition dest_part, Path dest_path, int currentTableId, + boolean destTableIsAcid, boolean destTableIsTemporary, + boolean destTableIsMaterialization, Path queryTmpdir, + SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, - RowSchema fsRS, boolean canBeMerged, Long mmWriteId, boolean isMmCtas) throws SemanticException { ++ RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas) throws SemanticException { + FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc, + conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(), + canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, + dest_path, mmWriteId, isMmCtas); boolean isHiveServerQuery = SessionState.get().isHiveServerQuery(); fileSinkDesc.setHiveServerQuery(isHiveServerQuery); @@@ -7448,14 -7395,12 +7456,9 @@@ */ conf.set(AcidUtils.CONF_ACID_KEY, "true"); - if (!Operation.NOT_ACID.equals(acidOp)) { - if (table.getNumBuckets() < 1) { - throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, table.getTableName()); - } - if (table.getSortCols() != null && table.getSortCols().size() > 0) { - throw new SemanticException(ErrorMsg.ACID_NO_SORTED_BUCKETS, table.getTableName()); - } + if (table.getSortCols() != null && table.getSortCols().size() > 0) { + throw new SemanticException(ErrorMsg.ACID_NO_SORTED_BUCKETS, table.getTableName()); } - - - } /** http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java index d0a82af,4f8ba6f..ffc2daf --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java @@@ -355,6 -396,52 +396,52 @@@ public class GenSparkUtils } /** + * Create and add any dependent move tasks. + * + * This is forked from {@link GenMapRedUtils}. The difference is that it doesn't check + * 'isLinkedFileSink' and does not set parent dir for the linked file sinks. + */ + public static Path createMoveTask(Task<? extends Serializable> currTask, boolean chDir, + FileSinkOperator fsOp, ParseContext parseCtx, List<Task<MoveWork>> mvTasks, + HiveConf hconf, DependencyCollectionTask dependencyTask) { + + Path dest = null; ++ FileSinkDesc fileSinkDesc = fsOp.getConf(); + + if (chDir) { + dest = fsOp.getConf().getFinalDirName(); + + // generate the temporary file + // it must be on the same file system as the current destination + Context baseCtx = parseCtx.getContext(); + + Path tmpDir = baseCtx.getExternalTmpPath(dest); + - FileSinkDesc fileSinkDesc = fsOp.getConf(); + // Change all the linked file sink descriptors + if (fileSinkDesc.getLinkedFileSinkDesc() != null) { + for (FileSinkDesc fsConf : fileSinkDesc.getLinkedFileSinkDesc()) { + fsConf.setDirName(tmpDir); + } + } else { + fileSinkDesc.setDirName(tmpDir); + } + } + + Task<MoveWork> mvTask = null; + + if (!chDir) { - mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp); ++ mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(mvTasks, fileSinkDesc.getFinalDirName(), false); + } + + // Set the move task to be dependent on the current task + if (mvTask != null) { + GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, dependencyTask); + } + + return dest; + } + + /** * Populate partition pruning information from the pruning sink operator to the * target MapWork (the MapWork for the big table side). The information include the source table * name, column name, and partition key expression. It also set up the temporary path used to http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 955e0e5,a3df166..4732f0a --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@@ -20,9 -20,9 +20,10 @@@ package org.apache.hadoop.hive.ql.plan import java.util.ArrayList; import java.util.List; + import java.util.Objects; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.Explain.Level; http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --cc ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 3600a6b,0f129fc..5a8ac31 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@@ -487,79 -439,7 +439,6 @@@ public class TestTxnCommands extends Te } } - /** - * takes raw data and turns it into a string as if from Driver.getResults() - * sorts rows in dictionary order - */ - private List<String> stringifyValues(int[][] rowsIn) { - assert rowsIn.length > 0; - int[][] rows = rowsIn.clone(); - Arrays.sort(rows, new RowComp()); - List<String> rs = new ArrayList<String>(); - for(int[] row : rows) { - assert row.length > 0; - StringBuilder sb = new StringBuilder(); - for(int value : row) { - sb.append(value).append("\t"); - } - sb.setLength(sb.length() - 1); - rs.add(sb.toString()); - } - return rs; - } - private static final class RowComp implements Comparator<int[]> { - @Override - public int compare(int[] row1, int[] row2) { - assert row1 != null && row2 != null && row1.length == row2.length; - for(int i = 0; i < row1.length; i++) { - int comp = Integer.compare(row1[i], row2[i]); - if(comp != 0) { - return comp; - } - } - return 0; - } - } - private String makeValuesClause(int[][] rows) { - assert rows.length > 0; - StringBuilder sb = new StringBuilder("values"); - for(int[] row : rows) { - assert row.length > 0; - if(row.length > 1) { - sb.append("("); - } - for(int value : row) { - sb.append(value).append(","); - } - sb.setLength(sb.length() - 1);//remove trailing comma - if(row.length > 1) { - sb.append(")"); - } - sb.append(","); - } - sb.setLength(sb.length() - 1);//remove trailing comma - return sb.toString(); - } - - private List<String> runStatementOnDriver(String stmt) throws Exception { - LOG.info("Running " + stmt); - CommandProcessorResponse cpr = d.run(stmt); - if(cpr.getResponseCode() != 0) { - throw new RuntimeException(stmt + " failed: " + cpr); - } - List<String> rs = new ArrayList<String>(); - d.getResults(rs); - return rs; - } - private CommandProcessorResponse runStatementOnDriverNegative(String stmt) throws Exception { - CommandProcessorResponse cpr = d.run(stmt); - if(cpr.getResponseCode() != 0) { - return cpr; - } - throw new RuntimeException("Didn't get expected failure!"); - } -- - // @Ignore @Test public void exchangePartition() throws Exception { runStatementOnDriver("create database ex1"); http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --cc ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 7a73a17,21b4a2c..bd6e6a0 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@@ -85,16 -84,16 +84,17 @@@ public class TestTxnCommands2 protected HiveConf hiveConf; protected Driver d; - protected static enum Table { + protected enum Table { ACIDTBL("acidTbl"), - ACIDTBLPART("acidTblPart"), + ACIDTBLPART("acidTblPart", "p"), NONACIDORCTBL("nonAcidOrcTbl"), - NONACIDPART("nonAcidPart"), - NONACIDPART2("nonAcidPart2"), - ACIDNESTEDPART("acidNestedPart"), + NONACIDPART("nonAcidPart", "p"), + NONACIDPART2("nonAcidPart2", "p2"), - ACIDNESTEDPART("acidNestedPart", "p,q"); ++ ACIDNESTEDPART("acidNestedPart", "p,q"), + MMTBL("mmTbl"); private final String name; + private final String partitionColumns; @Override public String toString() { return name; @@@ -738,14 -841,13 +843,13 @@@ // There should be only 1 directory left: base_0000001. // Original bucket files, delta directories and previous base directory should have been cleaned up. status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + - (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); Assert.assertEquals(1, status.length); - Assert.assertEquals("base_0000023", status[0].getPath().getName()); + Assert.assertEquals("base_0000025", status[0].getPath().getName()); FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); Arrays.sort(buckets); - Assert.assertEquals(BUCKET_COUNT, buckets.length); - Assert.assertEquals("bucket_00000", buckets[0].getPath().getName()); - Assert.assertEquals("bucket_00001", buckets[1].getPath().getName()); + Assert.assertEquals(1, buckets.length); + Assert.assertEquals("bucket_00001", buckets[0].getPath().getName()); rs = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL); resultData = new int[][] {{1, 3}, {3, 4}}; Assert.assertEquals(stringifyValues(resultData), rs); @@@ -1643,65 -1755,241 +1757,299 @@@ Assert.assertEquals(stringifyValues(rExpected), r); } + @Test + public void testBucketCodec() throws Exception { + d.destroy(); + //insert data in "legacy" format + hiveConf.setIntVar(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION, 0); + d = new Driver(hiveConf); + + int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals)); + + d.destroy(); + hiveConf.setIntVar(HiveConf.ConfVars.TESTMODE_BUCKET_CODEC_VERSION, 1); + d = new Driver(hiveConf); + //do some operations with new format + runStatementOnDriver("update " + Table.ACIDTBL + " set b=11 where a in (5,7)"); + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(11,11)"); + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 7"); + + //make sure we get the right data back before/after compactions + List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + int[][] rExpected = {{2,1},{4,3},{5,11},{11,11}}; + Assert.assertEquals(stringifyValues(rExpected), r); + + runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MINOR'"); + runWorker(hiveConf); + + r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(rExpected), r); + + runStatementOnDriver("ALTER TABLE " + Table.ACIDTBL + " COMPACT 'MAJOR'"); + runWorker(hiveConf); + + r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b"); + Assert.assertEquals(stringifyValues(rExpected), r); + } + /** + * Test the scenario when IOW comes in before a MAJOR compaction happens + * @throws Exception + */ + @Test + public void testInsertOverwrite1() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to an ACID table + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + } + + // 2. INSERT OVERWRITE + // Prepare data for the source table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)"); + // Insert overwrite ACID table from source table + runStatementOnDriver("insert overwrite table " + Table.ACIDTBL + " select a,b from " + Table.NONACIDORCTBL); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus a base dir in the location + Assert.assertEquals(3, status.length); + boolean sawBase = false; + String baseDir = ""; + int deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + baseDir = dirName; + Assert.assertTrue(baseDir.matches("base_.*")); + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertTrue(sawBase); + // Verify query result + List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + int [][] resultData = new int[][] {{5,6},{7,8}}; + Assert.assertEquals(stringifyValues(resultData), rs); + + // 3. Perform a major compaction. Nothing should change. Both deltas and base dirs should have the same name. + // Re-verify directory layout and query result by using the same logic as above + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus a base dir in the location + Assert.assertEquals(3, status.length); + sawBase = false; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + Assert.assertTrue(dirName.matches("base_.*")); + Assert.assertEquals(baseDir, dirName); + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertTrue(sawBase); + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 4. Run Cleaner. It should remove the 2 delta dirs. + runCleaner(hiveConf); + // There should be only 1 directory left: base_xxxxxxx. + // The delta dirs should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + Assert.assertEquals(baseDir, status[0].getPath().getName()); + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + } + + /** + * Test the scenario when IOW comes in after a MAJOR compaction happens + * @throws Exception + */ + @Test + public void testInsertOverwrite2() throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + FileStatus[] status; + + // 1. Insert two rows to an ACID table + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)"); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs in the location + Assert.assertEquals(2, status.length); + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + } + + // 2. Perform a major compaction. There should be an extra base dir now. + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus a base dir in the location + Assert.assertEquals(3, status.length); + boolean sawBase = false; + int deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + sawBase = true; + Assert.assertTrue(dirName.matches("base_.*")); + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertTrue(sawBase); + // Verify query result + int [][] resultData = new int[][] {{1,2},{3,4}}; + List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 3. INSERT OVERWRITE + // Prepare data for the source table + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) values(5,6),(7,8)"); + // Insert overwrite ACID table from source table + runStatementOnDriver("insert overwrite table " + Table.ACIDTBL + " select a,b from " + Table.NONACIDORCTBL); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus 2 base dirs in the location + Assert.assertEquals(4, status.length); + int baseCount = 0; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + baseCount++; + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertEquals(2, baseCount); + // Verify query result + resultData = new int[][] {{5,6},{7,8}}; + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 4. Perform another major compaction. Nothing should change. Both deltas and both base dirs + // should have the same name. + // Re-verify directory layout and query result by using the same logic as above + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + runWorker(hiveConf); + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + // There should be 2 delta dirs, plus 2 base dirs in the location + Assert.assertEquals(4, status.length); + baseCount = 0; + deltaCount = 0; + for (int i = 0; i < status.length; i++) { + String dirName = status[i].getPath().getName(); + if (dirName.matches("delta_.*")) { + deltaCount++; + } else { + Assert.assertTrue(dirName.matches("base_.*")); + baseCount++; + } + } + Assert.assertEquals(2, deltaCount); + Assert.assertEquals(2, baseCount); + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + + // 5. Run Cleaner. It should remove the 2 delta dirs and 1 old base dir. + runCleaner(hiveConf); + // There should be only 1 directory left: base_xxxxxxx. + // The delta dirs should have been cleaned up. + status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.ACIDTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, status.length); + Assert.assertTrue(status[0].getPath().getName().matches("base_.*")); + // Verify query result + rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a"); + Assert.assertEquals(stringifyValues(resultData), rs); + } + /** + * Test compaction for Micro-managed table + * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables + * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any + * @throws Exception + */ + @Test + public void testMmTableCompaction() throws Exception { + // 1. Insert some rows into MM table + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)"); + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)"); + // There should be 2 delta directories + verifyDirAndResult(2); + + // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay. + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + verifyDirAndResult(2); + + // 3. Let a transaction be aborted + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true); + runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)"); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false); + // There should be 3 delta directories. The new one is the aborted one. + verifyDirAndResult(3); + + // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction. + runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'"); + runWorker(hiveConf); + // The worker should remove the subdir for aborted transaction + verifyDirAndResult(2); + + // 5. Run Cleaner. Shouldn't impact anything. + runCleaner(hiveConf); + verifyDirAndResult(2); + } + + private void verifyDirAndResult(int expectedDeltas) throws Exception { + FileSystem fs = FileSystem.get(hiveConf); + // Verify the content of subdirs + FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + + (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); + int sawDeltaTimes = 0; + for (int i = 0; i < status.length; i++) { + Assert.assertTrue(status[i].getPath().getName().matches("delta_.*")); + sawDeltaTimes++; + FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER); + Assert.assertEquals(1, files.length); + Assert.assertTrue(files[0].getPath().getName().equals("000000_0")); + } + Assert.assertEquals(expectedDeltas, sawDeltaTimes); + + // Verify query result + int [][] resultData = new int[][] {{1,2}, {3,4}}; + List<String> rs = runStatementOnDriver("select a,b from " + Table.MMTBL); + Assert.assertEquals(stringifyValues(resultData), rs); + } + + /** * takes raw data and turns it into a string as if from Driver.getResults() * sorts rows in dictionary order */ http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --cc ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 6b19eb1,4c30732..ccd7d8e --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@@ -40,10 -46,15 +46,7 @@@ import java.util.TreeSet import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FSDataInputStream; --import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FSInputStream; --import org.apache.hadoop.fs.FileStatus; --import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; --import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; ++import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.type.HiveDecimal; http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/42a38577/ql/src/test/queries/clientpositive/mm_all.q ---------------------------------------------------------------------- diff --cc ql/src/test/queries/clientpositive/mm_all.q index 6001e9f,0000000..e23260f mode 100644,000000..100644 --- a/ql/src/test/queries/clientpositive/mm_all.q +++ b/ql/src/test/queries/clientpositive/mm_all.q @@@ -1,370 -1,0 +1,325 @@@ +set hive.mapred.mode=nonstrict; +set hive.explain.user=false; +set hive.fetch.task.conversion=none; +set tez.grouping.min-size=1; +set tez.grouping.max-size=2; +set hive.exec.dynamic.partition.mode=nonstrict; +set hive.support.concurrency=true; +set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + + +-- Force multiple writers when reading +drop table intermediate; +create table intermediate(key int) partitioned by (p int) stored as orc; +insert into table intermediate partition(p='455') select distinct key from src where key >= 0 order by key desc limit 2; +insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 2; +insert into table intermediate partition(p='457') select distinct key from src where key >= 100 order by key asc limit 2; + + +drop table part_mm; +create table part_mm(key int) partitioned by (key_mm int) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +explain insert into table part_mm partition(key_mm=455) select key from intermediate; +insert into table part_mm partition(key_mm=455) select key from intermediate; +insert into table part_mm partition(key_mm=456) select key from intermediate; +insert into table part_mm partition(key_mm=455) select key from intermediate; +select * from part_mm order by key, key_mm; + +-- TODO: doesn't work truncate table part_mm partition(key_mm=455); +select * from part_mm order by key, key_mm; +truncate table part_mm; +select * from part_mm order by key, key_mm; +drop table part_mm; + +drop table simple_mm; +create table simple_mm(key int) stored as orc tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +insert into table simple_mm select key from intermediate; +select * from simple_mm order by key; +insert into table simple_mm select key from intermediate; +select * from simple_mm order by key; +truncate table simple_mm; +select * from simple_mm; +drop table simple_mm; + + +-- simple DP (no bucketing) +drop table dp_mm; + +set hive.exec.dynamic.partition.mode=nonstrict; + +set hive.merge.mapredfiles=false; +set hive.merge.sparkfiles=false; +set hive.merge.tezfiles=false; + +create table dp_mm (key int) partitioned by (key1 string, key2 int) stored as orc + tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +insert into table dp_mm partition (key1='123', key2) select key, key from intermediate; + +select * from dp_mm order by key; + +drop table dp_mm; + + +-- union + +create table union_mm(id int) tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +insert into table union_mm +select temps.p from ( +select key as p from intermediate +union all +select key + 1 as p from intermediate ) temps; + +select * from union_mm order by id; + +insert into table union_mm +select p from +( +select key + 1 as p from intermediate +union all +select key from intermediate +) tab group by p +union all +select key + 2 as p from intermediate; + +select * from union_mm order by id; + +insert into table union_mm +SELECT p FROM +( + SELECT key + 1 as p FROM intermediate + UNION ALL + SELECT key as p FROM ( + SELECT distinct key FROM ( + SELECT key FROM ( + SELECT key + 2 as key FROM intermediate + UNION ALL + SELECT key FROM intermediate + )t1 + group by key)t2 + )t3 +)t4 +group by p; + + +select * from union_mm order by id; +drop table union_mm; + + +create table partunion_mm(id int) partitioned by (key int) tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +insert into table partunion_mm partition(key) +select temps.* from ( +select key as p, key from intermediate +union all +select key + 1 as p, key + 1 from intermediate ) temps; + +select * from partunion_mm order by id; +drop table partunion_mm; + + + +create table skew_mm(k1 int, k2 int, k4 int) skewed by (k1, k4) on ((0,0),(1,1),(2,2),(3,3)) + stored as directories tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +insert into table skew_mm +select key, key, key from intermediate; + +select * from skew_mm order by k2, k1, k4; +drop table skew_mm; + + +create table skew_dp_union_mm(k1 int, k2 int, k4 int) partitioned by (k3 int) +skewed by (k1, k4) on ((0,0),(1,1),(2,2),(3,3)) stored as directories tblproperties ("transactional"="true", "transactional_properties"="insert_only"); + +insert into table skew_dp_union_mm partition (k3) +select key as i, key as j, key as k, key as l from intermediate +union all +select key +1 as i, key +2 as j, key +3 as k, key +4 as l from intermediate; + + +select * from skew_dp_union_mm order by k2, k1, k4; +drop table skew_dp_union_mm; + + + +set hive.merge.orcfile.stripe.level=true; +set hive.merge.tezfiles=true; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=true; + + +create table merge0_mm (id int) stored as orc tblproperties("transactional"="true", "transactional_properties"="insert_only"); + +insert into table merge0_mm select key from intermediate; +select * from merge0_mm; + +set tez.grouping.split-count=1; +insert into table merge0_mm select key from intermediate; +set tez.grouping.split-count=0; +select * from merge0_mm; + +drop table merge0_mm; + + +create table merge2_mm (id int) tblproperties("transactional"="true", "transactional_properties"="insert_only"); + +insert into table merge2_mm select key from intermediate; +select * from merge2_mm; + +set tez.grouping.split-count=1; +insert into table merge2_mm select key from intermediate; +set tez.grouping.split-count=0; +select * from merge2_mm; + +drop table merge2_mm; + + +create table merge1_mm (id int) partitioned by (key int) stored as orc tblproperties("transactional"="true", "transactional_properties"="insert_only"); + +insert into table merge1_mm partition (key) select key, key from intermediate; +select * from merge1_mm order by id, key; + +set tez.grouping.split-count=1; +insert into table merge1_mm partition (key) select key, key from intermediate; +set tez.grouping.split-count=0; +select * from merge1_mm order by id, key; + +drop table merge1_mm; + +set hive.merge.tezfiles=false; +set hive.merge.mapfiles=false; +set hive.merge.mapredfiles=false; + +-- TODO: need to include merge+union+DP, but it's broken for now + + +drop table ctas0_mm; +create table ctas0_mm tblproperties ("transactional"="true", "transactional_properties"="insert_only") as select * from intermediate; +select * from ctas0_mm; +drop table ctas0_mm; + +drop table ctas1_mm; +create table ctas1_mm tblproperties ("transactional"="true", "transactional_properties"="insert_only") as + select * from intermediate union all select * from intermediate; +select * from ctas1_mm; +drop table ctas1_mm; + + - drop table load0_mm; - create table load0_mm (key string, value string) stored as textfile tblproperties("transactional"="true", "transactional_properties"="insert_only"); - load data local inpath '../../data/files/kv1.txt' into table load0_mm; - select count(1) from load0_mm; - load data local inpath '../../data/files/kv2.txt' into table load0_mm; - select count(1) from load0_mm; - load data local inpath '../../data/files/kv2.txt' overwrite into table load0_mm; - select count(1) from load0_mm; - drop table load0_mm; - - - drop table intermediate2; - create table intermediate2 (key string, value string) stored as textfile - location 'file:${system:test.tmp.dir}/intermediate2'; - load data local inpath '../../data/files/kv1.txt' into table intermediate2; - load data local inpath '../../data/files/kv2.txt' into table intermediate2; - load data local inpath '../../data/files/kv3.txt' into table intermediate2; - - drop table load1_mm; - create table load1_mm (key string, value string) stored as textfile tblproperties("transactional"="true", "transactional_properties"="insert_only"); - load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv2.txt' into table load1_mm; - load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv1.txt' into table load1_mm; - select count(1) from load1_mm; - load data local inpath '../../data/files/kv1.txt' into table intermediate2; - load data local inpath '../../data/files/kv2.txt' into table intermediate2; - load data local inpath '../../data/files/kv3.txt' into table intermediate2; - load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv*.txt' overwrite into table load1_mm; - select count(1) from load1_mm; - load data local inpath '../../data/files/kv2.txt' into table intermediate2; - load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv2.txt' overwrite into table load1_mm; - select count(1) from load1_mm; - drop table load1_mm; - - drop table load2_mm; - create table load2_mm (key string, value string) - partitioned by (k int, l int) stored as textfile tblproperties("transactional"="true", "transactional_properties"="insert_only"); - load data local inpath '../../data/files/kv1.txt' into table intermediate2; - load data local inpath '../../data/files/kv2.txt' into table intermediate2; - load data local inpath '../../data/files/kv3.txt' into table intermediate2; - load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv*.txt' into table load2_mm partition(k=5, l=5); - select count(1) from load2_mm; - drop table load2_mm; - drop table intermediate2; - - +drop table multi0_1_mm; +drop table multi0_2_mm; +create table multi0_1_mm (key int, key2 int) tblproperties("transactional"="true", "transactional_properties"="insert_only"); +create table multi0_2_mm (key int, key2 int) tblproperties("transactional"="true", "transactional_properties"="insert_only"); + +--from intermediate +--insert overwrite table multi0_1_mm select key, p +--insert overwrite table multi0_2_mm select p, key; +insert into table multi0_1_mm select key, p from intermediate; +insert into table multi0_2_mm select p, key from intermediate; + +select * from multi0_1_mm order by key, key2; +select * from multi0_2_mm order by key, key2; + +set hive.merge.mapredfiles=true; +set hive.merge.sparkfiles=true; +set hive.merge.tezfiles=true; + +--from intermediate +--insert into table multi0_1_mm select p, key +--insert overwrite table multi0_2_mm select key, p; +insert into table multi0_1_mm select p, key from intermediate; +insert into table multi0_2_mm select key, p from intermediate; +select * from multi0_1_mm order by key, key2; +select * from multi0_2_mm order by key, key2; + +set hive.merge.mapredfiles=false; +set hive.merge.sparkfiles=false; +set hive.merge.tezfiles=false; + +drop table multi0_1_mm; +drop table multi0_2_mm; + + +drop table multi1_mm; +create table multi1_mm (key int, key2 int) partitioned by (p int) tblproperties("transactional"="true", "transactional_properties"="insert_only"); +from intermediate +insert into table multi1_mm partition(p=1) select p, key +insert into table multi1_mm partition(p=2) select key, p; +select * from multi1_mm order by key, key2, p; +--from intermediate +--insert into table multi1_mm partition(p=2) select p, key +--insert overwrite table multi1_mm partition(p=1) select key, p; +insert into table multi1_mm partition(p=2) select p, key from intermediate; +insert into table multi1_mm partition(p=1) select key, p from intermediate; +select * from multi1_mm order by key, key2, p; + +from intermediate +insert into table multi1_mm partition(p) select p, key, p +insert into table multi1_mm partition(p=1) select key, p; +select key, key2, p from multi1_mm order by key, key2, p; + +from intermediate +insert into table multi1_mm partition(p) select p, key, 1 +insert into table multi1_mm partition(p=1) select key, p; +select key, key2, p from multi1_mm order by key, key2, p; +drop table multi1_mm; + + + + +set datanucleus.cache.collections=false; +set hive.stats.autogather=true; + +drop table stats_mm; +create table stats_mm(key int) tblproperties("transactional"="true", "transactional_properties"="insert_only"); +--insert overwrite table stats_mm select key from intermediate; +insert into table stats_mm select key from intermediate; +desc formatted stats_mm; + +insert into table stats_mm select key from intermediate; +desc formatted stats_mm; +drop table stats_mm; + +drop table stats2_mm; +create table stats2_mm tblproperties("transactional"="true", "transactional_properties"="insert_only") as select array(key, value) from src; +desc formatted stats2_mm; +drop table stats2_mm; + + +set hive.optimize.skewjoin=true; +set hive.skewjoin.key=2; +set hive.optimize.metadataonly=false; + +CREATE TABLE skewjoin_mm(key INT, value STRING) STORED AS TEXTFILE tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +FROM src src1 JOIN src src2 ON (src1.key = src2.key) INSERT into TABLE skewjoin_mm SELECT src1.key, src2.value; +select count(distinct key) from skewjoin_mm; +drop table skewjoin_mm; + +set hive.optimize.skewjoin=false; + +set hive.optimize.index.filter=true; +set hive.auto.convert.join=false; +CREATE TABLE parquet1_mm(id INT) STORED AS PARQUET tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +INSERT INTO parquet1_mm VALUES(1), (2); +CREATE TABLE parquet2_mm(id INT, value STRING) STORED AS PARQUET tblproperties ("transactional"="true", "transactional_properties"="insert_only"); +INSERT INTO parquet2_mm VALUES(1, 'value1'); +INSERT INTO parquet2_mm VALUES(1, 'value2'); +select parquet1_mm.id, t1.value, t2.value FROM parquet1_mm + JOIN parquet2_mm t1 ON parquet1_mm.id=t1.id + JOIN parquet2_mm t2 ON parquet1_mm.id=t2.id +where t1.value = 'value1' and t2.value = 'value2'; +drop table parquet1_mm; +drop table parquet2_mm; + +set hive.auto.convert.join=true; + + +DROP TABLE IF EXISTS temp1; +CREATE TEMPORARY TABLE temp1 (a int) TBLPROPERTIES ("transactional"="true", "transactional_properties"="insert_only"); +INSERT INTO temp1 SELECT key FROM intermediate; +DESC EXTENDED temp1; +SELECT * FROM temp1; + + +drop table intermediate; + + +