HIVE-15019 : handle import for MM tables (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8004f71e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8004f71e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8004f71e Branch: refs/heads/hive-14535 Commit: 8004f71e50c682a664215e04e2b387195c5932da Parents: 98b0b8e Author: Sergey Shelukhin <[email protected]> Authored: Thu Oct 20 15:05:24 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Thu Oct 20 15:05:24 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/common/ValidWriteIds.java | 15 +- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../hadoop/hive/metastore/HiveMetaStore.java | 3 + .../apache/hadoop/hive/ql/exec/CopyTask.java | 48 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 30 +- .../hive/ql/exec/DependencyCollectionTask.java | 1 - .../hadoop/hive/ql/exec/ImportCommitTask.java | 65 ++ .../hadoop/hive/ql/exec/ImportCommitWork.java | 48 ++ .../apache/hadoop/hive/ql/exec/MoveTask.java | 10 +- .../apache/hadoop/hive/ql/exec/TaskFactory.java | 2 + .../apache/hadoop/hive/ql/metadata/Hive.java | 7 +- .../hive/ql/parse/DDLSemanticAnalyzer.java | 8 +- .../apache/hadoop/hive/ql/parse/EximUtil.java | 6 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 206 ++++--- .../hive/ql/parse/LoadSemanticAnalyzer.java | 1 - .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +- .../hadoop/hive/ql/parse/TaskCompiler.java | 2 +- .../apache/hadoop/hive/ql/plan/CopyWork.java | 9 + .../hadoop/hive/ql/plan/CreateTableDesc.java | 14 +- .../hadoop/hive/ql/plan/LoadTableDesc.java | 14 +- ql/src/test/queries/clientpositive/mm_all.q | 119 +++- ql/src/test/queries/clientpositive/mm_current.q | 83 ++- .../results/clientpositive/llap/mm_all.q.out | 586 +++++++++++++++++-- .../clientpositive/llap/mm_current.q.out | 447 ++++++-------- 24 files changed, 1209 insertions(+), 519 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java index 7ef4f55..df0278c 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java +++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java @@ -137,7 +137,20 @@ public class ValidWriteIds { } } - + public static class AnyIdDirFilter implements PathFilter { + @Override + public boolean accept(Path path) { + String name = path.getName(); + if (!name.startsWith(MM_PREFIX + "_")) return false; + String idStr = name.substring(MM_PREFIX.length() + 1); + try { + Long.parseLong(idStr); + } catch (NumberFormatException ex) { + return false; + } + return true; + } + } public static Long extractWriteId(Path file) { String fileName = file.getName(); String[] parts = fileName.split("_", 3); http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index ccc29f8..8a00f07 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1209,6 +1209,8 @@ public class HiveConf extends Configuration { HIVETESTMODE("hive.test.mode", false, "Whether Hive is running in test mode. If yes, it turns on sampling and prefixes the output tablename.", false), + HIVEEXIMTESTMODE("hive.exim.test.mode", false, + "The subset of test mode that only enables custom path handling for ExIm.", false), HIVETESTMODEPREFIX("hive.test.mode.prefix", "test_", "In test mode, specfies prefixes for the output table", false), HIVETESTMODESAMPLEFREQ("hive.test.mode.samplefreq", 32, http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 9f16eb2..4436f3a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -6570,6 +6570,9 @@ public class HiveMetaStore extends ThriftHiveMetastore { private MTableWrite getActiveTableWrite(RawStore ms, String dbName, String tblName, long writeId) throws MetaException { MTableWrite tw = ms.getTableWrite(dbName, tblName, writeId); + if (tw == null) { + return null; + } assert tw.getState().length() == 1; char state = tw.getState().charAt(0); if (state != MM_WRITE_OPEN) { http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index cbe0aca..a8a44bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -18,14 +18,20 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.FileNotFoundException; +import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; @@ -37,7 +43,6 @@ import org.apache.hadoop.util.StringUtils; * CopyTask implementation. **/ public class CopyTask extends Task<CopyWork> implements Serializable { - private static final long serialVersionUID = 1L; private static transient final Logger LOG = LoggerFactory.getLogger(CopyTask.class); @@ -60,7 +65,7 @@ public class CopyTask extends Task<CopyWork> implements Serializable { FileSystem srcFs = fromPath.getFileSystem(conf); dstFs = toPath.getFileSystem(conf); - FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath); + FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.isSourceMm()); if (srcs == null || srcs.length == 0) { if (work.isErrorOnSrcEmpty()) { console.printError("No files matching path: " + fromPath.toString()); @@ -97,6 +102,45 @@ public class CopyTask extends Task<CopyWork> implements Serializable { } } + // Note: initially copied from LoadSemanticAnalyzer. + private static FileStatus[] matchFilesOrDir( + FileSystem fs, Path path, boolean isSourceMm) throws IOException { + if (!isSourceMm) return matchFilesOneDir(fs, path, null); + // TODO: this doesn't handle list bucketing properly. Does the original exim do that? + FileStatus[] mmDirs = fs.listStatus(path, new ValidWriteIds.AnyIdDirFilter()); + if (mmDirs == null || mmDirs.length == 0) return null; + List<FileStatus> allFiles = new ArrayList<FileStatus>(); + for (FileStatus mmDir : mmDirs) { + Utilities.LOG14535.info("Found source MM directory " + mmDir.getPath()); + matchFilesOneDir(fs, mmDir.getPath(), allFiles); + } + return allFiles.toArray(new FileStatus[allFiles.size()]); + } + + private static FileStatus[] matchFilesOneDir( + FileSystem fs, Path path, List<FileStatus> result) throws IOException { + FileStatus[] srcs = fs.globStatus(path, new EximPathFilter()); + if (srcs != null && srcs.length == 1) { + if (srcs[0].isDirectory()) { + srcs = fs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + } + } + if (result != null && srcs != null) { + for (int i = 0; i < srcs.length; ++i) { + result.add(srcs[i]); + } + } + return srcs; + } + + private static final class EximPathFilter implements PathFilter { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return name.equals("_metadata") ? true : !name.startsWith("_") && !name.startsWith("."); + } + } + @Override public StageType getType() { return StageType.COPY; http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index bb9eaf5..1f89f27 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4045,7 +4045,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { tbl.getDataLocation()); // create the table - if (crtTbl.getReplaceMode()){ + if (crtTbl.getReplaceMode()) { // replace-mode creates are really alters using CreateTableDesc. try { db.alterTable(tbl.getDbName()+"."+tbl.getTableName(),tbl,null); @@ -4059,28 +4059,36 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } else { db.createTable(tbl, crtTbl.getIfNotExists()); } - if (crtTbl.isCTAS()) { + Long mmWriteId = crtTbl.getInitialMmWriteId(); + if (crtTbl.isCTAS() || mmWriteId != null) { Table createdTable = db.getTable(tbl.getDbName(), tbl.getTableName()); - if (crtTbl.getInitialWriteId() != null) { + if (mmWriteId != null) { // TODO# this would be retrieved via ACID before the query runs; for now we rely on it // being zero at start; we can't create a write ID before we create the table here. long initialWriteId = db.getNextTableWriteId(tbl.getDbName(), tbl.getTableName()); - if (initialWriteId != crtTbl.getInitialWriteId()) { - throw new HiveException("Initial write ID mismatch - expected " - + crtTbl.getInitialWriteId() + " but got " + initialWriteId); + if (initialWriteId != mmWriteId) { + throw new HiveException("Initial write ID mismatch - expected " + mmWriteId + + " but got " + initialWriteId); + } + // CTAS create the table on a directory that already exists; import creates the table + // first (in parallel with copies?), then commits after all the loads. + if (crtTbl.isCTAS()) { + db.commitMmTableWrite(tbl, initialWriteId); } - db.commitMmTableWrite(tbl, initialWriteId); } - DataContainer dc = new DataContainer(createdTable.getTTable()); - SessionState.get().getLineageState().setLineage( - createdTable.getPath(), dc, createdTable.getCols() - ); + if (crtTbl.isCTAS()) { + DataContainer dc = new DataContainer(createdTable.getTTable()); + SessionState.get().getLineageState().setLineage( + createdTable.getPath(), dc, createdTable.getCols() + ); + } } } work.getOutputs().add(new WriteEntity(tbl, WriteEntity.WriteType.DDL_NO_LOCK)); return 0; } + /** * Create a new table like an existing table. * http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/exec/DependencyCollectionTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DependencyCollectionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DependencyCollectionTask.java index 9189cfc..e639572 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DependencyCollectionTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DependencyCollectionTask.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec; import java.io.Serializable; -import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.api.StageType; http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java new file mode 100644 index 0000000..efa9bc3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.util.StringUtils; + +public class ImportCommitTask extends Task<ImportCommitWork> { + + private static final long serialVersionUID = 1L; + + public ImportCommitTask() { + super(); + } + + @Override + public int execute(DriverContext driverContext) { + Utilities.LOG14535.info("Executing ImportCommit for " + work.getMmWriteId()); + + try { + if (driverContext.getCtx().getExplainAnalyze() == AnalyzeState.RUNNING) { + return 0; + } + Hive db = getHive(); + Table tbl = db.getTable(work.getDbName(), work.getTblName()); + db.commitMmTableWrite(tbl, work.getMmWriteId()); + return 0; + } catch (Exception e) { + console.printError("Failed with exception " + e.getMessage(), "\n" + + StringUtils.stringifyException(e)); + setException(e); + return 1; + } + } + + @Override + public StageType getType() { + return StageType.MOVE; // The commit for import is normally done as part of MoveTask. + } + + @Override + public String getName() { + return "IMPORT_COMMIT"; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java new file mode 100644 index 0000000..f62d237 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +@Explain(displayName = "Import Commit", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +public class ImportCommitWork implements Serializable { + private static final long serialVersionUID = 1L; + private String dbName, tblName; + private long mmWriteId; + + public ImportCommitWork(String dbName, String tblName, long mmWriteId) { + this.mmWriteId = mmWriteId; + this.dbName = dbName; + this.tblName = tblName; + } + + public long getMmWriteId() { + return mmWriteId; + } + + public String getDbName() { + return dbName; + } + + public String getTblName() { + return tblName; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index eea4357..99c52fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -326,6 +326,10 @@ public class MoveTask extends Task<MoveWork> implements Serializable { if (tbd.getPartitionSpec().size() == 0) { dc = new DataContainer(table.getTTable()); Utilities.LOG14535.info("loadTable called from " + tbd.getSourcePath() + " into " + tbd.getTable().getTableName()); + if (tbd.isMmTable() && !tbd.isCommitMmWrite()) { + throw new HiveException( + "Only single-partition LoadTableDesc can skip commiting write ID"); + } db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getReplace(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isAcid, hasFollowingStatsTask(), tbd.getMmWriteId()); @@ -386,12 +390,13 @@ public class MoveTask extends Task<MoveWork> implements Serializable { db.validatePartitionNameCharacters(partVals); Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() + " into " + tbd.getTable().getTableName()); + boolean isCommitMmWrite = tbd.isCommitMmWrite(); db.loadSinglePartition(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getPartitionSpec(), tbd.getReplace(), tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), (work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.INSERT_ONLY), - hasFollowingStatsTask(), tbd.getMmWriteId()); + hasFollowingStatsTask(), tbd.getMmWriteId(), isCommitMmWrite); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); if (ti.bucketCols != null || ti.sortCols != null) { @@ -428,6 +433,9 @@ public class MoveTask extends Task<MoveWork> implements Serializable { // iterate over it and call loadPartition() here. // The reason we don't do inside HIVE-1361 is the latter is large and we // want to isolate any potential issue it may introduce. + if (tbd.isMmTable() && !tbd.isCommitMmWrite()) { + throw new HiveException("Only single-partition LoadTableDesc can skip commiting write ID"); + } Map<Map<String, String>, Partition> dp = db.loadDynamicPartitions( tbd.getSourcePath(), http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 14fd61a..822ff41 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -100,6 +100,8 @@ public final class TaskFactory { MergeFileTask.class)); taskvec.add(new TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class, DependencyCollectionTask.class)); + taskvec.add(new TaskTuple<ImportCommitWork>(ImportCommitWork.class, + ImportCommitTask.class)); taskvec.add(new TaskTuple<PartialScanWork>(PartialScanWork.class, PartialScanTask.class)); taskvec.add(new TaskTuple<IndexMetadataChangeWork>(IndexMetadataChangeWork.class, http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index d6dc2d3..30b22d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1502,14 +1502,15 @@ public class Hive { public void loadSinglePartition(Path loadPath, String tableName, Map<String, String> partSpec, boolean replace, boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir, boolean isSrcLocal, boolean isAcid, - boolean hasFollowingStatsTask, Long mmWriteId) throws HiveException { + boolean hasFollowingStatsTask, Long mmWriteId, boolean isCommitMmWrite) + throws HiveException { Table tbl = getTable(tableName); boolean isMmTableWrite = (mmWriteId != null); Preconditions.checkState(isMmTableWrite == MetaStoreUtils.isMmTable(tbl.getParameters())); loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs, isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask, mmWriteId); - if (isMmTableWrite) { - // The assumption behind committing here is that this partition is the only one outputted + if (isMmTableWrite && isCommitMmWrite) { + // The assumption behind committing here is that this partition is the only one outputted. commitMmTableWrite(tbl, mmWriteId); } } http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 03c2e79..a018b54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1060,10 +1060,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { // so the operation is atomic. Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); truncateTblDesc.setOutputDir(queryTmpdir); + // TODO# movetask is created here; handle MM tables LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, - partSpec == null ? new HashMap<String, String>() : partSpec); + partSpec == null ? new HashMap<String, String>() : partSpec, null); ltd.setLbCtx(lbCtx); - // TODO# movetask is created here; handle MM tables Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); truncateTask.addDependentTask(moveTsk); @@ -1677,10 +1677,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { TableDesc tblDesc = Utilities.getTableDesc(tblObj); Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); mergeDesc.setOutputDir(queryTmpdir); + // No need to handle MM tables - unsupported path. LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, - partSpec == null ? new HashMap<String, String>() : partSpec); + partSpec == null ? new HashMap<String, String>() : partSpec, null); ltd.setLbCtx(lbCtx); - // No need to handle MM tables - unsupported path. Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); mergeTask.addDependentTask(moveTsk); http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 167f7a5..3826d9f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -74,7 +74,8 @@ public class EximUtil { */ static URI getValidatedURI(HiveConf conf, String dcPath) throws SemanticException { try { - boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); + boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE) + || conf.getBoolVar(HiveConf.ConfVars.HIVEEXIMTESTMODE); URI uri = new Path(dcPath).toUri(); String scheme = uri.getScheme(); String authority = uri.getAuthority(); @@ -136,7 +137,8 @@ public class EximUtil { } public static String relativeToAbsolutePath(HiveConf conf, String location) throws SemanticException { - boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE); + boolean testMode = conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODE) + || conf.getBoolVar(HiveConf.ConfVars.HIVEEXIMTESTMODE); if (testMode) { URI uri = new Path(location).toUri(); String scheme = uri.getScheme(); http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 043de2f..2a525e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -38,7 +38,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.ValidWriteIds; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; @@ -48,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.ImportCommitWork; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -121,6 +124,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } // get partition metadata if partition specified if (child.getChildCount() == 2) { + @SuppressWarnings("unused") // TODO: wtf? ASTNode partspec = (ASTNode) child.getChild(1); isPartSpecSet = true; parsePartitionSpec(child, parsedPartSpec); @@ -158,9 +162,13 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { // Create table associated with the import // Executed if relevant, and used to contain all the other details about the table if not. - CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable()); + CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname, rv.getTable()); + boolean isSourceMm = MetaStoreUtils.isMmTable(tblDesc.getTblProps()); - if (isExternalSet){ + if (isExternalSet) { + if (isSourceMm) { + throw new SemanticException("Cannot import an MM table as external"); + } tblDesc.setExternal(isExternalSet); // This condition-check could have been avoided, but to honour the old // default of not calling if it wasn't set, we retain that behaviour. @@ -219,21 +227,32 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Table table = tableIfExists(tblDesc); if (table != null){ - checkTable(table, tblDesc,replicationSpec); + checkTable(table, tblDesc, replicationSpec); LOG.debug("table " + tblDesc.getTableName() + " exists: metadata checked"); tableExists = true; } + Long mmWriteId = null; + if (table != null && MetaStoreUtils.isMmTable(table.getParameters())) { + mmWriteId = db.getNextTableWriteId(table.getDbName(), table.getTableName()); + } else if (table == null && isSourceMm) { + // We could import everything as is - directories and IDs, but that won't work with ACID + // txn ids in future. So, let's import everything into the new MM directory with ID == 0. + mmWriteId = 0l; + } + if (mmWriteId != null) { + tblDesc.setInitialMmWriteId(mmWriteId); + } if (!replicationSpec.isInReplicationScope()){ createRegularImportTasks( rootTasks, tblDesc, partitionDescs, isPartSpecSet, replicationSpec, table, - fromURI, fs, wh); + fromURI, fs, wh, mmWriteId, isSourceMm); } else { createReplImportTasks( rootTasks, tblDesc, partitionDescs, isPartSpecSet, replicationSpec, table, - fromURI, fs, wh); + fromURI, fs, wh, mmWriteId, isSourceMm); } } catch (SemanticException e) { throw e; @@ -318,45 +337,42 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return tblDesc; } - private Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath) { + private Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, + Long mmWriteId, boolean isSourceMm) { Path dataPath = new Path(fromURI.toString(), "data"); - Path tmpPath = ctx.getExternalTmpPath(tgtPath); - Task<?> copyTask = TaskFactory.get(new CopyWork(dataPath, - tmpPath, false), conf); - // TODO# we assume mm=false here - LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, - Utilities.getTableDesc(table), new TreeMap<String, String>(), - replace, null); - Task<?> loadTableTask = TaskFactory.get(new MoveWork(getInputs(), - getOutputs(), loadTableWork, null, false), conf); + Path destPath = mmWriteId == null ? ctx.getExternalTmpPath(tgtPath) + : new Path(tgtPath, ValidWriteIds.getMmFilePrefix(mmWriteId)); + Utilities.LOG14535.info("adding import work for table with source location: " + + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + + mmWriteId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName())); + + CopyWork cv = new CopyWork(dataPath, destPath, false); + cv.setIsSourceMm(isSourceMm); + LoadTableDesc loadTableWork = new LoadTableDesc(destPath, + Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, mmWriteId); + MoveWork mv = new MoveWork(getInputs(), getOutputs(), loadTableWork, null, false); + @SuppressWarnings("unchecked") + Task<?> loadTableTask = TaskFactory.get(mv, conf), copyTask = TaskFactory.get(cv, conf); copyTask.addDependentTask(loadTableTask); rootTasks.add(copyTask); return loadTableTask; } + @SuppressWarnings("unchecked") private Task<?> createTableTask(CreateTableDesc tableDesc){ - return TaskFactory.get(new DDLWork( - getInputs(), - getOutputs(), - tableDesc - ), conf); + return TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tableDesc), conf); } + @SuppressWarnings("unchecked") private Task<?> dropTableTask(Table table){ - return TaskFactory.get(new DDLWork( - getInputs(), - getOutputs(), - new DropTableDesc(table.getTableName(), null, true, true, null) - ), conf); + return TaskFactory.get(new DDLWork(getInputs(), getOutputs(), + new DropTableDesc(table.getTableName(), null, true, true, null)), conf); } + @SuppressWarnings("unchecked") private Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc) { tableDesc.setReplaceMode(true); - return TaskFactory.get(new DDLWork( - getInputs(), - getOutputs(), - tableDesc - ), conf); + return TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tableDesc), conf); } private Task<? extends Serializable> alterSinglePartition( @@ -365,50 +381,54 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn) { addPartitionDesc.setReplaceMode(true); addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location - return TaskFactory.get(new DDLWork( - getInputs(), - getOutputs(), - addPartitionDesc - ), conf); + @SuppressWarnings("unchecked") + Task<?> r = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc), conf); + return r; } private Task<?> addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc, - Table table, Warehouse wh, - AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec) + Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, + ReplicationSpec replicationSpec, Long mmWriteId, boolean isSourceMm, Task<?> commitTask) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); if (tblDesc.isExternal() && tblDesc.getLocation() == null) { LOG.debug("Importing in-place: adding AddPart for partition " + partSpecToString(partSpec.getPartSpec())); // addPartitionDesc already has the right partition location + @SuppressWarnings("unchecked") Task<?> addPartTask = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc), conf); return addPartTask; } else { String srcLocation = partSpec.getLocation(); fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec); - LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " - + partSpecToString(partSpec.getPartSpec()) - + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - Path tmpPath = ctx.getExternalTmpPath(tgtLocation); - Task<?> copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), - tmpPath, false), conf); - Task<?> addPartTask = TaskFactory.get(new DDLWork(getInputs(), - getOutputs(), addPartitionDesc), conf); - // TODO# we assume mm=false here - LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, - Utilities.getTableDesc(table), - partSpec.getPartSpec(), true, null); + Path destPath = mmWriteId == null ? ctx.getExternalTmpPath(tgtLocation) + : new Path(tgtLocation, ValidWriteIds.getMmFilePrefix(mmWriteId)); + Path moveTaskSrc = mmWriteId == null ? destPath : tgtLocation; + Utilities.LOG14535.info("adding import work for partition with source location: " + + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm " + + mmWriteId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec())); + CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false); + cw.setIsSourceMm(isSourceMm); + DDLWork dw = new DDLWork(getInputs(), getOutputs(), addPartitionDesc); + LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), + partSpec.getPartSpec(), true, mmWriteId); loadTableWork.setInheritTableSpecs(false); - // TODO# movetask is created here; handle MM tables - Task<?> loadPartTask = TaskFactory.get(new MoveWork( - getInputs(), getOutputs(), loadTableWork, null, false), - conf); + // Do not commit the write ID from each task; need to commit once. + // TODO: we should just change the import to use a single MoveTask, like dynparts. + loadTableWork.setIntermediateInMmWrite(mmWriteId != null); + MoveWork mv = new MoveWork(getInputs(), getOutputs(), loadTableWork, null, false); + @SuppressWarnings("unchecked") + Task<?> copyTask = TaskFactory.get(cw, conf), addPartTask = TaskFactory.get(dw, conf), + loadPartTask = TaskFactory.get(mv, conf); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); rootTasks.add(copyTask); + if (commitTask != null) { + loadPartTask.addDependentTask(commitTask); + } return addPartTask; } } @@ -572,13 +592,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Class<? extends OutputFormat> replaced = HiveFileFormatUtils .getOutputFormatSubstitute(origin); if (replaced == null) { - throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE - .getMsg()); + throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE.getMsg()); } importedofc = replaced.getCanonicalName(); } catch(Exception e) { - throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE - .getMsg()); + throw new SemanticException(ErrorMsg.INVALID_OUTPUT_FORMAT_TYPE.getMsg()); } if ((!existingifc.equals(importedifc)) || (!existingofc.equals(importedofc))) { @@ -685,43 +703,42 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { /** * Create tasks for regular import, no repl complexity */ - private void createRegularImportTasks( - List<Task<? extends Serializable>> rootTasks, - CreateTableDesc tblDesc, - List<AddPartitionDesc> partitionDescs, - boolean isPartSpecSet, - ReplicationSpec replicationSpec, - Table table, URI fromURI, FileSystem fs, Warehouse wh) + private void createRegularImportTasks(List<Task<? extends Serializable>> rootTasks, + CreateTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet, + ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh, + Long mmWriteId, boolean isSourceMm) throws HiveException, URISyntaxException, IOException, MetaException { - if (table != null){ + if (table != null) { if (table.isPartitioned()) { LOG.debug("table partitioned"); + Task<?> ict = createImportCommitTask(table.getDbName(), table.getTableName(), mmWriteId); for (AddPartitionDesc addPartitionDesc : partitionDescs) { Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = db.getPartition(table, partSpec, false)) == null) { - rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec)); + rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, + replicationSpec, mmWriteId, isSourceMm, ict)); } else { throw new SemanticException( ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec))); } } - } else { LOG.debug("table non-partitioned"); // ensure if destination is not empty only for regular import Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf); checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec); - loadTable(fromURI, table, false, tgtPath); + loadTable(fromURI, table, false, tgtPath, mmWriteId, isSourceMm); } // Set this to read because we can't overwrite any existing partitions outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); } else { LOG.debug("table " + tblDesc.getTableName() + " does not exist"); + @SuppressWarnings("unchecked") Task<?> t = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tblDesc), conf); table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); Database parentDb = db.getDatabase(tblDesc.getDatabaseName()); @@ -731,9 +748,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { outputs.add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED)); if (isPartitioned(tblDesc)) { + Task<?> ict = createImportCommitTask( + tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId); for (AddPartitionDesc addPartitionDesc : partitionDescs) { - t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec)); + t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, + replicationSpec, mmWriteId, isSourceMm, ict)); } } else { LOG.debug("adding dependent CopyWork/MoveWork for table"); @@ -750,24 +769,30 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } FileSystem tgtFs = FileSystem.get(tablePath.toUri(), conf); checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec); - t.addDependentTask(loadTable(fromURI, table, false, tablePath)); + t.addDependentTask(loadTable(fromURI, table, false, tablePath, mmWriteId, isSourceMm)); } } rootTasks.add(t); } } + private Task<?> createImportCommitTask(String dbName, String tblName, Long mmWriteId) { + @SuppressWarnings("unchecked") + Task<ImportCommitWork> ict = (mmWriteId == null) ? null : TaskFactory.get( + new ImportCommitWork(dbName, tblName, mmWriteId), conf); + return ict; + } + /** * Create tasks for repl import */ - private void createReplImportTasks( - List<Task<? extends Serializable>> rootTasks, - CreateTableDesc tblDesc, - List<AddPartitionDesc> partitionDescs, - boolean isPartSpecSet, ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh) + private void createReplImportTasks(List<Task<? extends Serializable>> rootTasks, + CreateTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet, + ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh, + Long mmWriteId, boolean isSourceMm) throws HiveException, URISyntaxException, IOException, MetaException { - Task dr = null; + Task<?> dr = null; WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; if ((table != null) && (isPartitioned(tblDesc) != table.isPartitioned())){ @@ -810,18 +835,21 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { lockType = WriteEntity.WriteType.DDL_SHARED; } - Task t = createTableTask(tblDesc); + Task<?> t = createTableTask(tblDesc); table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); if (!replicationSpec.isMetadataOnly()) { if (isPartitioned(tblDesc)) { + Task<?> ict = createImportCommitTask( + tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId); for (AddPartitionDesc addPartitionDesc : partitionDescs) { - t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec)); + t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, + addPartitionDesc, replicationSpec, mmWriteId, isSourceMm, ict)); } } else { LOG.debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()))); + t.addDependentTask(loadTable( + fromURI, table, true, new Path(tblDesc.getLocation()), mmWriteId, isSourceMm)); } } if (dr == null){ @@ -837,22 +865,25 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if (table.isPartitioned()) { LOG.debug("table partitioned"); for (AddPartitionDesc addPartitionDesc : partitionDescs) { - Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; - + Task<?> ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask( + tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId); if ((ptn = db.getPartition(table, partSpec, false)) == null) { if (!replicationSpec.isMetadataOnly()){ - rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec)); + rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, + replicationSpec, mmWriteId, isSourceMm, ict)); } } else { // If replicating, then the partition already existing means we need to replace, maybe, if // the destination ptn's repl.last.id is older than the replacement's. if (replicationSpec.allowReplacementInto(ptn)){ if (!replicationSpec.isMetadataOnly()){ - rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec)); + rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, + addPartitionDesc, replicationSpec, mmWriteId, isSourceMm, ict)); } else { - rootTasks.add(alterSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn)); + rootTasks.add(alterSinglePartition(fromURI, fs, tblDesc, table, wh, + addPartitionDesc, replicationSpec, ptn)); } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; @@ -876,7 +907,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return; // silently return, table is newer than our replacement. } if (!replicationSpec.isMetadataOnly()) { - loadTable(fromURI, table, true, new Path(fromURI)); // repl-imports are replace-into + // repl-imports are replace-into + loadTable(fromURI, table, true, new Path(fromURI), mmWriteId, isSourceMm); } else { rootTasks.add(alterTableTask(tblDesc)); } http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index e38b0f7..26274f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -261,7 +261,6 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { } } - // TODO# movetask is created here; handle MM tables Long mmWriteId = null; Table tbl = ts.tableHandle; if (MetaStoreUtils.isMmTable(tbl.getParameters())) { http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f74c0a9..0d83abf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6692,7 +6692,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // TODO# this should really get current ACID txn; assuming ACID works correctly the txn // should have been opened to create the ACID table. For now use the first ID. mmWriteId = 0l; - tblDesc.setInitialWriteId(mmWriteId); + tblDesc.setInitialMmWriteId(mmWriteId); } } else if (viewDesc != null) { field_schemas = new ArrayList<FieldSchema>(); http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index e177925..d09e401 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -317,7 +317,7 @@ public abstract class TaskCompiler { if (pCtx.getQueryProperties().isCTAS()) { CreateTableDesc ctd = pCtx.getCreateTable(); dataSinkForCtas = ctd.getAndUnsetWriter(); - mmWriteIdForCtas = ctd.getInitialWriteId(); + mmWriteIdForCtas = ctd.getInitialMmWriteId(); loc = ctd.getLocation(); } else { loc = pCtx.getCreateViewDesc().getLocation(); http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index 9a4e782..2e484ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -33,6 +33,7 @@ public class CopyWork implements Serializable { private Path fromPath; private Path toPath; private boolean errorOnSrcEmpty; + private boolean isMm = false; public CopyWork() { } @@ -65,4 +66,12 @@ public class CopyWork implements Serializable { return errorOnSrcEmpty; } + public void setIsSourceMm(boolean isMm) { + this.isMm = isMm; + } + + public boolean isSourceMm() { + return isMm ; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java index 7609068..4b452b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java @@ -94,7 +94,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable { private boolean isCTAS = false; List<SQLPrimaryKey> primaryKeys; List<SQLForeignKey> foreignKeys; - private Long initialWriteId; + private Long initialMmWriteId; // Initial MM write ID for CTAS and import. // The FSOP configuration for the FSOP that is going to write initial data during ctas. // This is not needed beyond compilation, so it is transient. private transient FileSinkDesc writer; @@ -829,14 +829,16 @@ public class CreateTableDesc extends DDLDesc implements Serializable { return tbl; } - public void setInitialWriteId(Long mmWriteId) { - this.initialWriteId = mmWriteId; + public void setInitialMmWriteId(Long mmWriteId) { + this.initialMmWriteId = mmWriteId; } - public Long getInitialWriteId() { - return initialWriteId; + public Long getInitialMmWriteId() { + return initialMmWriteId; } + + public FileSinkDesc getAndUnsetWriter() { FileSinkDesc fsd = writer; writer = null; @@ -846,6 +848,4 @@ public class CreateTableDesc extends DDLDesc implements Serializable { public void setWriter(FileSinkDesc writer) { this.writer = writer; } - - } http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index bf858b6..1b7d325 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -47,6 +47,7 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc // TODO: the below seems like they should just be combined into partitionDesc private org.apache.hadoop.hive.ql.plan.TableDesc table; private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map + private boolean commitMmWriteId = true; private LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, @@ -88,9 +89,8 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc */ public LoadTableDesc(final Path sourcePath, final org.apache.hadoop.hive.ql.plan.TableDesc table, - final Map<String, String> partitionSpec) { - // TODO# we assume mm=false here - this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, null); + final Map<String, String> partitionSpec, Long mmWriteId) { + this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, mmWriteId); } public LoadTableDesc(final Path sourcePath, @@ -189,4 +189,12 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc public Long getMmWriteId() { return mmWriteId; } + + public void setIntermediateInMmWrite(boolean b) { + this.commitMmWriteId = !b; + } + + public boolean isCommitMmWrite() { + return commitMmWriteId; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/test/queries/clientpositive/mm_all.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_all.q b/ql/src/test/queries/clientpositive/mm_all.q index 5377568..6639aaa 100644 --- a/ql/src/test/queries/clientpositive/mm_all.q +++ b/ql/src/test/queries/clientpositive/mm_all.q @@ -270,38 +270,93 @@ drop table load2_mm; drop table intermediate2; --- IMPORT - - - --- TODO# future --- ---create table exim_department ( dep_id int) stored as textfile; ---dfs -rmr target/tmp/ql/test/data/exports/exim_department; ---export table exim_department to 'ql/test/data/exports/exim_department'; ---drop table exim_department; ---create database importer; ---use importer; ---create table exim_department ( dep_id int) stored as textfile; ---import from 'ql/test/data/exports/exim_department'; --- --- ---create table exim_department ( dep_id int) stored as textfile; ---load data local inpath "../../data/files/test.dat" into table exim_department; ---dfs ${system:test.dfs.mkdir} target/tmp/ql/test/data/exports/exim_department/test; ---dfs -rmr target/tmp/ql/test/data/exports/exim_department; ---export table exim_department to 'ql/test/data/exports/exim_department'; ---drop table exim_department; --- ---create database importer; ---use importer; --- ---set hive.security.authorization.enabled=true; ---import from 'ql/test/data/exports/exim_department'; - - - --- TODO multi-insert, truncate +drop table intermediate_nonpart; +drop table intermmediate_part; +drop table intermmediate_nonpart; +create table intermediate_nonpart(key int, p int); +insert into intermediate_nonpart select * from intermediate; +create table intermmediate_nonpart(key int, p int) tblproperties('hivecommit'='true'); +insert into intermmediate_nonpart select * from intermediate; +create table intermmediate(key int) partitioned by (p int) tblproperties('hivecommit'='true'); +insert into table intermmediate partition(p) select key, p from intermediate; + +set hive.exim.test.mode=true; + +export table intermediate_nonpart to 'ql/test/data/exports/intermediate_nonpart'; +export table intermmediate_nonpart to 'ql/test/data/exports/intermmediate_nonpart'; +export table intermediate to 'ql/test/data/exports/intermediate_part'; +export table intermmediate to 'ql/test/data/exports/intermmediate_part'; + +drop table intermediate_nonpart; +drop table intermmediate_part; +drop table intermmediate_nonpart; + +-- non-MM export to MM table, with and without partitions + +drop table import0_mm; +create table import0_mm(key int, p int) tblproperties('hivecommit'='true'); +import table import0_mm from 'ql/test/data/exports/intermediate_nonpart'; +select * from import0_mm order by key, p; +drop table import0_mm; + + + +drop table import1_mm; +create table import1_mm(key int) partitioned by (p int) + stored as orc tblproperties('hivecommit'='true'); +import table import1_mm from 'ql/test/data/exports/intermediate_part'; +select * from import1_mm order by key, p; +drop table import1_mm; + + +-- MM export into new MM table, non-part and part + +drop table import2_mm; +import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart'; +desc import2_mm; +select * from import2_mm order by key, p; +drop table import2_mm; + +drop table import3_mm; +import table import3_mm from 'ql/test/data/exports/intermmediate_part'; +desc import3_mm; +select * from import3_mm order by key, p; +drop table import3_mm; + +-- MM export into existing MM table, non-part and partial part + +drop table import4_mm; +create table import4_mm(key int, p int) tblproperties('hivecommit'='true'); +import table import4_mm from 'ql/test/data/exports/intermmediate_nonpart'; +select * from import4_mm order by key, p; +drop table import4_mm; + +drop table import5_mm; +create table import5_mm(key int) partitioned by (p int) tblproperties('hivecommit'='true'); +import table import5_mm partition(p=455) from 'ql/test/data/exports/intermmediate_part'; +select * from import5_mm order by key, p; +drop table import5_mm; + +-- MM export into existing non-MM table, non-part and part + +drop table import6_mm; +create table import6_mm(key int, p int); +import table import6_mm from 'ql/test/data/exports/intermmediate_nonpart'; +select * from import6_mm order by key, p; +drop table import6_mm; + +drop table import7_mm; +create table import7_mm(key int) partitioned by (p int); +import table import7_mm from 'ql/test/data/exports/intermmediate_part'; +select * from import7_mm order by key, p; +drop table import7_mm; + + + +set hive.exim.test.mode=false; + + +-- TODO# multi-insert, truncate http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/test/queries/clientpositive/mm_current.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_current.q b/ql/src/test/queries/clientpositive/mm_current.q index 391017b..ab28d35 100644 --- a/ql/src/test/queries/clientpositive/mm_current.q +++ b/ql/src/test/queries/clientpositive/mm_current.q @@ -11,53 +11,42 @@ create table intermediate(key int) partitioned by (p int) stored as orc; insert into table intermediate partition(p='455') select distinct key from src where key >= 0 order by key desc limit 2; insert into table intermediate partition(p='456') select distinct key from src where key is not null order by key asc limit 2; - - -drop table load0_mm; -create table load0_mm (key string, value string) stored as textfile tblproperties('hivecommit'='true'); -load data local inpath '../../data/files/kv1.txt' into table load0_mm; -select count(1) from load0_mm; -load data local inpath '../../data/files/kv2.txt' into table load0_mm; -select count(1) from load0_mm; -load data local inpath '../../data/files/kv2.txt' overwrite into table load0_mm; -select count(1) from load0_mm; -drop table load0_mm; - - -drop table intermediate2; -create table intermediate2 (key string, value string) stored as textfile -location 'file:${system:test.tmp.dir}/intermediate2'; -load data local inpath '../../data/files/kv1.txt' into table intermediate2; -load data local inpath '../../data/files/kv2.txt' into table intermediate2; -load data local inpath '../../data/files/kv3.txt' into table intermediate2; - -drop table load1_mm; -create table load1_mm (key string, value string) stored as textfile tblproperties('hivecommit'='true'); -load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv2.txt' into table load1_mm; -load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv1.txt' into table load1_mm; -select count(1) from load1_mm; -load data local inpath '../../data/files/kv1.txt' into table intermediate2; -load data local inpath '../../data/files/kv2.txt' into table intermediate2; -load data local inpath '../../data/files/kv3.txt' into table intermediate2; -load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv*.txt' overwrite into table load1_mm; -select count(1) from load1_mm; -load data local inpath '../../data/files/kv2.txt' into table intermediate2; -load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv2.txt' overwrite into table load1_mm; -select count(1) from load1_mm; -drop table load1_mm; - -drop table load2_mm; -create table load2_mm (key string, value string) - partitioned by (k int, l int) stored as textfile tblproperties('hivecommit'='true'); -load data local inpath '../../data/files/kv1.txt' into table intermediate2; -load data local inpath '../../data/files/kv2.txt' into table intermediate2; -load data local inpath '../../data/files/kv3.txt' into table intermediate2; -load data inpath 'file:${system:test.tmp.dir}/intermediate2/kv*.txt' into table load2_mm partition(k=5, l=5); -select count(1) from load2_mm; -drop table load2_mm; -drop table intermediate2; - - +drop table intermediate_nonpart; +drop table intermmediate_part; +drop table intermmediate_nonpart; + + +create table intermediate_nonpart(key int, p int); +insert into intermediate_nonpart select * from intermediate; +create table intermmediate_nonpart(key int, p int) tblproperties('hivecommit'='true'); +insert into intermmediate_nonpart select * from intermediate; +create table intermmediate(key int) partitioned by (p int) tblproperties('hivecommit'='true'); +insert into table intermmediate partition(p) select key, p from intermediate; + +set hive.exim.test.mode=true; + +export table intermediate_nonpart to 'ql/test/data/exports/intermediate_nonpart'; +export table intermmediate_nonpart to 'ql/test/data/exports/intermmediate_nonpart'; +export table intermediate to 'ql/test/data/exports/intermediate_part'; +export table intermmediate to 'ql/test/data/exports/intermmediate_part'; + +drop table intermediate_nonpart; +drop table intermmediate_part; +drop table intermmediate_nonpart; + +-- MM export into existing non-MM table, non-part and part + +drop table import6_mm; +create table import6_mm(key int, p int); +import table import6_mm from 'ql/test/data/exports/intermmediate_nonpart'; +select * from import6_mm order by key, p; +drop table import6_mm; + +drop table import7_mm; +create table import7_mm(key int) partitioned by (p int); +import table import7_mm from 'ql/test/data/exports/intermmediate_part'; +select * from import7_mm order by key, p; +drop table import7_mm; drop table intermediate; http://git-wip-us.apache.org/repos/asf/hive/blob/8004f71e/ql/src/test/results/clientpositive/llap/mm_all.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/mm_all.q.out b/ql/src/test/results/clientpositive/llap/mm_all.q.out index 6569369..7d77047 100644 --- a/ql/src/test/results/clientpositive/llap/mm_all.q.out +++ b/ql/src/test/results/clientpositive/llap/mm_all.q.out @@ -1924,37 +1924,541 @@ POSTHOOK: query: drop table intermediate2 POSTHOOK: type: DROPTABLE POSTHOOK: Input: default@intermediate2 POSTHOOK: Output: default@intermediate2 -PREHOOK: query: -- IMPORT +PREHOOK: query: drop table intermediate_nonpart +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table intermediate_nonpart +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table intermmediate_part +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table intermmediate_part +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table intermmediate_nonpart +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table intermmediate_nonpart +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table intermediate_nonpart(key int, p int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@intermediate_nonpart +POSTHOOK: query: create table intermediate_nonpart(key int, p int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@intermediate_nonpart +PREHOOK: query: insert into intermediate_nonpart select * from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@intermediate_nonpart +POSTHOOK: query: insert into intermediate_nonpart select * from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@intermediate_nonpart +POSTHOOK: Lineage: intermediate_nonpart.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: intermediate_nonpart.p SIMPLE [(intermediate)intermediate.FieldSchema(name:p, type:int, comment:null), ] +PREHOOK: query: create table intermmediate_nonpart(key int, p int) tblproperties('hivecommit'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@intermmediate_nonpart +POSTHOOK: query: create table intermmediate_nonpart(key int, p int) tblproperties('hivecommit'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@intermmediate_nonpart +PREHOOK: query: insert into intermmediate_nonpart select * from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@intermmediate_nonpart +POSTHOOK: query: insert into intermmediate_nonpart select * from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@intermmediate_nonpart +POSTHOOK: Lineage: intermmediate_nonpart.key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: intermmediate_nonpart.p SIMPLE [(intermediate)intermediate.FieldSchema(name:p, type:int, comment:null), ] +PREHOOK: query: create table intermmediate(key int) partitioned by (p int) tblproperties('hivecommit'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@intermmediate +POSTHOOK: query: create table intermmediate(key int) partitioned by (p int) tblproperties('hivecommit'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@intermmediate +PREHOOK: query: insert into table intermmediate partition(p) select key, p from intermediate +PREHOOK: type: QUERY +PREHOOK: Input: default@intermediate +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +PREHOOK: Output: default@intermmediate +POSTHOOK: query: insert into table intermmediate partition(p) select key, p from intermediate +POSTHOOK: type: QUERY +POSTHOOK: Input: default@intermediate +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +POSTHOOK: Output: default@intermmediate@p=455 +POSTHOOK: Output: default@intermmediate@p=456 +POSTHOOK: Output: default@intermmediate@p=457 +POSTHOOK: Lineage: intermmediate PARTITION(p=455).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: intermmediate PARTITION(p=456).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +POSTHOOK: Lineage: intermmediate PARTITION(p=457).key SIMPLE [(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ] +PREHOOK: query: export table intermediate_nonpart to 'ql/test/data/exports/intermediate_nonpart' +PREHOOK: type: EXPORT +PREHOOK: Input: default@intermediate_nonpart +#### A masked pattern was here #### +POSTHOOK: query: export table intermediate_nonpart to 'ql/test/data/exports/intermediate_nonpart' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@intermediate_nonpart +#### A masked pattern was here #### +PREHOOK: query: export table intermmediate_nonpart to 'ql/test/data/exports/intermmediate_nonpart' +PREHOOK: type: EXPORT +PREHOOK: Input: default@intermmediate_nonpart +#### A masked pattern was here #### +POSTHOOK: query: export table intermmediate_nonpart to 'ql/test/data/exports/intermmediate_nonpart' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@intermmediate_nonpart +#### A masked pattern was here #### +PREHOOK: query: export table intermediate to 'ql/test/data/exports/intermediate_part' +PREHOOK: type: EXPORT +PREHOOK: Input: default@intermediate@p=455 +PREHOOK: Input: default@intermediate@p=456 +PREHOOK: Input: default@intermediate@p=457 +#### A masked pattern was here #### +POSTHOOK: query: export table intermediate to 'ql/test/data/exports/intermediate_part' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@intermediate@p=455 +POSTHOOK: Input: default@intermediate@p=456 +POSTHOOK: Input: default@intermediate@p=457 +#### A masked pattern was here #### +PREHOOK: query: export table intermmediate to 'ql/test/data/exports/intermmediate_part' +PREHOOK: type: EXPORT +PREHOOK: Input: default@intermmediate@p=455 +PREHOOK: Input: default@intermmediate@p=456 +PREHOOK: Input: default@intermmediate@p=457 +#### A masked pattern was here #### +POSTHOOK: query: export table intermmediate to 'ql/test/data/exports/intermmediate_part' +POSTHOOK: type: EXPORT +POSTHOOK: Input: default@intermmediate@p=455 +POSTHOOK: Input: default@intermmediate@p=456 +POSTHOOK: Input: default@intermmediate@p=457 +#### A masked pattern was here #### +PREHOOK: query: drop table intermediate_nonpart +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@intermediate_nonpart +PREHOOK: Output: default@intermediate_nonpart +POSTHOOK: query: drop table intermediate_nonpart +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@intermediate_nonpart +POSTHOOK: Output: default@intermediate_nonpart +PREHOOK: query: drop table intermmediate_part +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table intermmediate_part +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table intermmediate_nonpart +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@intermmediate_nonpart +PREHOOK: Output: default@intermmediate_nonpart +POSTHOOK: query: drop table intermmediate_nonpart +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@intermmediate_nonpart +POSTHOOK: Output: default@intermmediate_nonpart +PREHOOK: query: -- non-MM export to MM table, with and without partitions +drop table import0_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- non-MM export to MM table, with and without partitions +drop table import0_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table import0_mm(key int, p int) tblproperties('hivecommit'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import0_mm +POSTHOOK: query: create table import0_mm(key int, p int) tblproperties('hivecommit'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import0_mm +PREHOOK: query: import table import0_mm from 'ql/test/data/exports/intermediate_nonpart' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import0_mm +POSTHOOK: query: import table import0_mm from 'ql/test/data/exports/intermediate_nonpart' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import0_mm +PREHOOK: query: select * from import0_mm order by key, p +PREHOOK: type: QUERY +PREHOOK: Input: default@import0_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from import0_mm order by key, p +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import0_mm +#### A masked pattern was here #### +0 456 +10 456 +97 455 +98 455 +100 457 +103 457 +PREHOOK: query: drop table import0_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import0_mm +PREHOOK: Output: default@import0_mm +POSTHOOK: query: drop table import0_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import0_mm +POSTHOOK: Output: default@import0_mm +PREHOOK: query: drop table import1_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table import1_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table import1_mm(key int) partitioned by (p int) + stored as orc tblproperties('hivecommit'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import1_mm +POSTHOOK: query: create table import1_mm(key int) partitioned by (p int) + stored as orc tblproperties('hivecommit'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import1_mm +PREHOOK: query: import table import1_mm from 'ql/test/data/exports/intermediate_part' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import1_mm +POSTHOOK: query: import table import1_mm from 'ql/test/data/exports/intermediate_part' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import1_mm +POSTHOOK: Output: default@import1_mm@p=455 +POSTHOOK: Output: default@import1_mm@p=456 +POSTHOOK: Output: default@import1_mm@p=457 +PREHOOK: query: select * from import1_mm order by key, p +PREHOOK: type: QUERY +PREHOOK: Input: default@import1_mm +PREHOOK: Input: default@import1_mm@p=455 +PREHOOK: Input: default@import1_mm@p=456 +PREHOOK: Input: default@import1_mm@p=457 +#### A masked pattern was here #### +POSTHOOK: query: select * from import1_mm order by key, p +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import1_mm +POSTHOOK: Input: default@import1_mm@p=455 +POSTHOOK: Input: default@import1_mm@p=456 +POSTHOOK: Input: default@import1_mm@p=457 +#### A masked pattern was here #### +0 456 +10 456 +97 455 +98 455 +100 457 +103 457 +PREHOOK: query: drop table import1_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import1_mm +PREHOOK: Output: default@import1_mm +POSTHOOK: query: drop table import1_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import1_mm +POSTHOOK: Output: default@import1_mm +PREHOOK: query: -- MM export into new MM table, non-part and part + +drop table import2_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- MM export into new MM table, non-part and part + +drop table import2_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: database:default +POSTHOOK: query: import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import2_mm +PREHOOK: query: desc import2_mm +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@import2_mm +POSTHOOK: query: desc import2_mm +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@import2_mm +key int +p int +PREHOOK: query: select * from import2_mm order by key, p +PREHOOK: type: QUERY +PREHOOK: Input: default@import2_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from import2_mm order by key, p +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import2_mm +#### A masked pattern was here #### +0 456 +10 456 +97 455 +98 455 +100 457 +103 457 +PREHOOK: query: drop table import2_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import2_mm +PREHOOK: Output: default@import2_mm +POSTHOOK: query: drop table import2_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import2_mm +POSTHOOK: Output: default@import2_mm +PREHOOK: query: drop table import3_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table import3_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: import table import3_mm from 'ql/test/data/exports/intermmediate_part' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: database:default +POSTHOOK: query: import table import3_mm from 'ql/test/data/exports/intermmediate_part' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import3_mm +POSTHOOK: Output: default@import3_mm@p=455 +POSTHOOK: Output: default@import3_mm@p=456 +POSTHOOK: Output: default@import3_mm@p=457 +PREHOOK: query: desc import3_mm +PREHOOK: type: DESCTABLE +PREHOOK: Input: default@import3_mm +POSTHOOK: query: desc import3_mm +POSTHOOK: type: DESCTABLE +POSTHOOK: Input: default@import3_mm +key int +p int + +# Partition Information +# col_name data_type comment + +p int +PREHOOK: query: select * from import3_mm order by key, p +PREHOOK: type: QUERY +PREHOOK: Input: default@import3_mm +PREHOOK: Input: default@import3_mm@p=455 +PREHOOK: Input: default@import3_mm@p=456 +PREHOOK: Input: default@import3_mm@p=457 +#### A masked pattern was here #### +POSTHOOK: query: select * from import3_mm order by key, p +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import3_mm +POSTHOOK: Input: default@import3_mm@p=455 +POSTHOOK: Input: default@import3_mm@p=456 +POSTHOOK: Input: default@import3_mm@p=457 +#### A masked pattern was here #### +0 456 +10 456 +97 455 +98 455 +100 457 +103 457 +PREHOOK: query: drop table import3_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import3_mm +PREHOOK: Output: default@import3_mm +POSTHOOK: query: drop table import3_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import3_mm +POSTHOOK: Output: default@import3_mm +PREHOOK: query: -- MM export into existing MM table, non-part and partial part --- TODO# future --- ---create table exim_department ( dep_id int) stored as textfile; -#### A masked pattern was here #### ---export table exim_department to 'ql/test/data/exports/exim_department'; ---drop table exim_department; ---create database importer; ---use importer; ---create table exim_department ( dep_id int) stored as textfile; ---import from 'ql/test/data/exports/exim_department'; --- --- ---create table exim_department ( dep_id int) stored as textfile; ---load data local inpath "../../data/files/test.dat" into table exim_department; -#### A masked pattern was here #### ---export table exim_department to 'ql/test/data/exports/exim_department'; ---drop table exim_department; --- ---create database importer; ---use importer; --- ---set hive.security.authorization.enabled=true; ---import from 'ql/test/data/exports/exim_department'; +drop table import4_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- MM export into existing MM table, non-part and partial part +drop table import4_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table import4_mm(key int, p int) tblproperties('hivecommit'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import4_mm +POSTHOOK: query: create table import4_mm(key int, p int) tblproperties('hivecommit'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import4_mm +PREHOOK: query: import table import4_mm from 'ql/test/data/exports/intermmediate_nonpart' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import4_mm +POSTHOOK: query: import table import4_mm from 'ql/test/data/exports/intermmediate_nonpart' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import4_mm +PREHOOK: query: select * from import4_mm order by key, p +PREHOOK: type: QUERY +PREHOOK: Input: default@import4_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from import4_mm order by key, p +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import4_mm +#### A masked pattern was here #### +0 456 +10 456 +97 455 +98 455 +100 457 +103 457 +PREHOOK: query: drop table import4_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import4_mm +PREHOOK: Output: default@import4_mm +POSTHOOK: query: drop table import4_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import4_mm +POSTHOOK: Output: default@import4_mm +PREHOOK: query: drop table import5_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table import5_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table import5_mm(key int) partitioned by (p int) tblproperties('hivecommit'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import5_mm +POSTHOOK: query: create table import5_mm(key int) partitioned by (p int) tblproperties('hivecommit'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import5_mm +PREHOOK: query: import table import5_mm partition(p=455) from 'ql/test/data/exports/intermmediate_part' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import5_mm +POSTHOOK: query: import table import5_mm partition(p=455) from 'ql/test/data/exports/intermmediate_part' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import5_mm +POSTHOOK: Output: default@import5_mm@p=455 +PREHOOK: query: select * from import5_mm order by key, p +PREHOOK: type: QUERY +PREHOOK: Input: default@import5_mm +PREHOOK: Input: default@import5_mm@p=455 +#### A masked pattern was here #### +POSTHOOK: query: select * from import5_mm order by key, p +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import5_mm +POSTHOOK: Input: default@import5_mm@p=455 +#### A masked pattern was here #### +97 455 +98 455 +PREHOOK: query: drop table import5_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import5_mm +PREHOOK: Output: default@import5_mm +POSTHOOK: query: drop table import5_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import5_mm +POSTHOOK: Output: default@import5_mm +PREHOOK: query: -- MM export into existing non-MM table, non-part and part +drop table import6_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- MM export into existing non-MM table, non-part and part --- TODO multi-insert, truncate +drop table import6_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table import6_mm(key int, p int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import6_mm +POSTHOOK: query: create table import6_mm(key int, p int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import6_mm +PREHOOK: query: import table import6_mm from 'ql/test/data/exports/intermmediate_nonpart' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import6_mm +POSTHOOK: query: import table import6_mm from 'ql/test/data/exports/intermmediate_nonpart' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import6_mm +PREHOOK: query: select * from import6_mm order by key, p +PREHOOK: type: QUERY +PREHOOK: Input: default@import6_mm +#### A masked pattern was here #### +POSTHOOK: query: select * from import6_mm order by key, p +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import6_mm +#### A masked pattern was here #### +0 456 +10 456 +97 455 +98 455 +100 457 +103 457 +PREHOOK: query: drop table import6_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import6_mm +PREHOOK: Output: default@import6_mm +POSTHOOK: query: drop table import6_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import6_mm +POSTHOOK: Output: default@import6_mm +PREHOOK: query: drop table import7_mm +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table import7_mm +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table import7_mm(key int) partitioned by (p int) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@import7_mm +POSTHOOK: query: create table import7_mm(key int) partitioned by (p int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@import7_mm +PREHOOK: query: import table import7_mm from 'ql/test/data/exports/intermmediate_part' +PREHOOK: type: IMPORT +#### A masked pattern was here #### +PREHOOK: Output: default@import7_mm +POSTHOOK: query: import table import7_mm from 'ql/test/data/exports/intermmediate_part' +POSTHOOK: type: IMPORT +#### A masked pattern was here #### +POSTHOOK: Output: default@import7_mm +POSTHOOK: Output: default@import7_mm@p=455 +POSTHOOK: Output: default@import7_mm@p=456 +POSTHOOK: Output: default@import7_mm@p=457 +PREHOOK: query: select * from import7_mm order by key, p +PREHOOK: type: QUERY +PREHOOK: Input: default@import7_mm +PREHOOK: Input: default@import7_mm@p=455 +PREHOOK: Input: default@import7_mm@p=456 +PREHOOK: Input: default@import7_mm@p=457 +#### A masked pattern was here #### +POSTHOOK: query: select * from import7_mm order by key, p +POSTHOOK: type: QUERY +POSTHOOK: Input: default@import7_mm +POSTHOOK: Input: default@import7_mm@p=455 +POSTHOOK: Input: default@import7_mm@p=456 +POSTHOOK: Input: default@import7_mm@p=457 +#### A masked pattern was here #### +0 456 +10 456 +97 455 +98 455 +100 457 +103 457 +PREHOOK: query: drop table import7_mm +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@import7_mm +PREHOOK: Output: default@import7_mm +POSTHOOK: query: drop table import7_mm +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@import7_mm +POSTHOOK: Output: default@import7_mm +PREHOOK: query: -- TODO# multi-insert, truncate @@ -1962,37 +2466,7 @@ drop table intermediate PREHOOK: type: DROPTABLE PREHOOK: Input: default@intermediate PREHOOK: Output: default@intermediate -POSTHOOK: query: -- IMPORT - - - --- TODO# future --- ---create table exim_department ( dep_id int) stored as textfile; -#### A masked pattern was here #### ---export table exim_department to 'ql/test/data/exports/exim_department'; ---drop table exim_department; ---create database importer; ---use importer; ---create table exim_department ( dep_id int) stored as textfile; ---import from 'ql/test/data/exports/exim_department'; --- --- ---create table exim_department ( dep_id int) stored as textfile; ---load data local inpath "../../data/files/test.dat" into table exim_department; -#### A masked pattern was here #### ---export table exim_department to 'ql/test/data/exports/exim_department'; ---drop table exim_department; --- ---create database importer; ---use importer; --- ---set hive.security.authorization.enabled=true; ---import from 'ql/test/data/exports/exim_department'; - - - --- TODO multi-insert, truncate +POSTHOOK: query: -- TODO# multi-insert, truncate
