HIVE-17657 : export/import for MM tables is broken (Sergey Shelukhin, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cbe8e619 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cbe8e619 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cbe8e619 Branch: refs/heads/branch-3 Commit: cbe8e61980729d8c5d6c86987c6026dc61f394c7 Parents: 33b283c Author: sergey <[email protected]> Authored: Mon May 7 13:20:51 2018 -0700 Committer: sergey <[email protected]> Committed: Tue Jun 5 12:17:16 2018 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/exec/CopyTask.java | 49 ++------ .../apache/hadoop/hive/ql/exec/ExportTask.java | 10 +- .../apache/hadoop/hive/ql/exec/Utilities.java | 3 + .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 6 +- .../hive/ql/parse/ExportSemanticAnalyzer.java | 45 ++++--- .../hive/ql/parse/ImportSemanticAnalyzer.java | 34 ++---- .../hive/ql/parse/SemanticAnalyzerFactory.java | 2 +- .../ql/parse/repl/dump/PartitionExport.java | 9 +- .../hive/ql/parse/repl/dump/TableExport.java | 11 +- .../ql/parse/repl/dump/io/FileOperations.java | 84 +++++++++++-- .../apache/hadoop/hive/ql/plan/CopyWork.java | 18 --- .../apache/hadoop/hive/ql/plan/ExportWork.java | 60 +++++---- .../apache/hadoop/hive/ql/TestTxnCommands.java | 122 +++++++++++++++++-- .../org/apache/hadoop/hive/ql/TestTxnExIm.java | 9 +- .../hadoop/hive/ql/TxnCommandsBaseForTests.java | 3 +- ql/src/test/queries/clientpositive/mm_exim.q | 4 +- .../results/clientpositive/llap/mm_exim.q.out | 66 +++++++++- 17 files changed, 382 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index b0ec5ab..1a8e5e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -63,14 +63,25 @@ public class CopyTask extends Task<CopyWork> implements Serializable { protected int copyOnePath(Path fromPath, Path toPath) { FileSystem dstFs = null; try { - Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} " + fromPath); + Utilities.FILE_OP_LOGGER.trace("Copying data from {} to {} ", fromPath, toPath); console.printInfo("Copying data from " + fromPath.toString(), " to " + toPath.toString()); FileSystem srcFs = fromPath.getFileSystem(conf); dstFs = toPath.getFileSystem(conf); - FileStatus[] srcs = matchFilesOrDir(srcFs, fromPath, work.doSkipSourceMmDirs()); + FileStatus[] srcs = srcFs.globStatus(fromPath, new EximPathFilter()); + + // TODO: this is very brittle given that Hive supports nested directories in the tables. + // The caller should pass a flag explicitly telling us if the directories in the + // input are data, or parent of data. For now, retain this for backward compat. + if (srcs != null && srcs.length == 1 && srcs[0].isDirectory() + /*&& srcs[0].getPath().getName().equals(EximUtil.DATA_PATH_NAME) - still broken for partitions*/) { + Utilities.FILE_OP_LOGGER.debug( + "Recursing into a single child directory {}", srcs[0].getPath().getName()); + srcs = srcFs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); + } + if (srcs == null || srcs.length == 0) { if (work.isErrorOnSrcEmpty()) { console.printError("No files matching path: " + fromPath.toString()); @@ -107,40 +118,6 @@ public class CopyTask extends Task<CopyWork> implements Serializable { } } - // Note: initially copied from LoadSemanticAnalyzer. - private static FileStatus[] matchFilesOrDir( - FileSystem fs, Path path, boolean isSourceMm) throws IOException { - if (!fs.exists(path)) return null; - if (!isSourceMm) return matchFilesOneDir(fs, path, null); - // Note: this doesn't handle list bucketing properly; neither does the original code. - FileStatus[] mmDirs = fs.listStatus(path, new AcidUtils.AnyIdDirFilter()); - if (mmDirs == null || mmDirs.length == 0) return null; - List<FileStatus> allFiles = new ArrayList<FileStatus>(); - for (FileStatus mmDir : mmDirs) { - if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Found source MM directory " + mmDir.getPath()); - } - matchFilesOneDir(fs, mmDir.getPath(), allFiles); - } - return allFiles.toArray(new FileStatus[allFiles.size()]); - } - - private static FileStatus[] matchFilesOneDir( - FileSystem fs, Path path, List<FileStatus> result) throws IOException { - FileStatus[] srcs = fs.globStatus(path, new EximPathFilter()); - if (srcs != null && srcs.length == 1) { - if (srcs[0].isDirectory()) { - srcs = fs.listStatus(srcs[0].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - } - } - if (result != null && srcs != null) { - for (int i = 0; i < srcs.length; ++i) { - result.add(srcs[i]); - } - } - return srcs; - } - private static final class EximPathFilter implements PathFilter { @Override public boolean accept(Path p) { http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java index e3af4f9..3c6a606 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java @@ -47,15 +47,13 @@ public class ExportTask extends Task<ExportWork> implements Serializable { protected int execute(DriverContext driverContext) { try { // Also creates the root directory - TableExport.Paths exportPaths = - new TableExport.Paths(work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), - conf, false); + TableExport.Paths exportPaths = new TableExport.Paths( + work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false); Hive db = getHive(); LOG.debug("Exporting data to: {}", exportPaths.exportRootDir()); work.acidPostProcess(db); - TableExport tableExport = new TableExport( - exportPaths, work.getTableSpec(), work.getReplicationSpec(), db, null, conf - ); + TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(), + work.getReplicationSpec(), db, null, conf, work.getMmContext()); if (!tableExport.write()) { throw new SemanticException(ErrorMsg.EXIM_FOR_NON_NATIVE.getMsg()); } http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 31846a3..406bea0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import java.beans.DefaultPersistenceDelegate; import java.beans.Encoder; import java.beans.Expression; @@ -72,6 +73,7 @@ import java.util.regex.Pattern; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; + import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.codec.binary.Base64; @@ -136,6 +138,7 @@ import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 88d352b..ccdf04a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -288,7 +289,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) { tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList)); } - new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write(); + MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle); + new TableExport( + exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf, mmCtx).write(); + replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType()); } catch (InvalidTableException te) { http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index d3c62a2..4a366a9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -19,12 +19,17 @@ package org.apache.hadoop.hive.ql.parse; +import java.util.Set; + +import javax.annotation.Nullable; + import org.antlr.runtime.tree.Tree; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -32,15 +37,14 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.plan.ExportWork; - -import javax.annotation.Nullable; -import java.util.Set; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; /** * ExportSemanticAnalyzer. * */ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { + private boolean isMmExport = false; ExportSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -48,7 +52,9 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { @Override public void analyzeInternal(ASTNode ast) throws SemanticException { - rootTasks.add(analyzeExport(ast, null, db, conf, inputs, outputs)); + Task<ExportWork> task = analyzeExport(ast, null, db, conf, inputs, outputs); + isMmExport = task.getWork().getMmContext() != null; + rootTasks.add(task); } /** * @param acidTableName - table name in db.table format; not NULL if exporting Acid table @@ -80,12 +86,10 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { try { ts = new TableSpec(db, conf, (ASTNode) tableTree, false, true); - } catch (SemanticException sme){ - if ((replicationSpec.isInReplicationScope()) && - ((sme.getCause() instanceof InvalidTableException) - || (sme instanceof Table.ValidationFailureSemanticException) - ) - ){ + } catch (SemanticException sme) { + if (!replicationSpec.isInReplicationScope()) throw sme; + if ((sme.getCause() instanceof InvalidTableException) + || (sme instanceof Table.ValidationFailureSemanticException)) { // If we're in replication scope, it's possible that we're running the export long after // the table was dropped, so the table not existing currently or being a different kind of // table is not an error - it simply means we should no-op, and let a future export @@ -101,15 +105,26 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { // All parsing is done, we're now good to start the export process TableExport.Paths exportPaths = new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf, false); - TableExport tableExport = new TableExport(exportPaths, ts, replicationSpec, db, null, conf); - TableExport.AuthEntities authEntities = tableExport.getAuthEntities(); + // Note: this tableExport is actually never used other than for auth, and another one is + // created when the task is executed. So, we don't care about the correct MM state here. + TableExport.AuthEntities authEntities = new TableExport( + exportPaths, ts, replicationSpec, db, null, conf, null).getAuthEntities(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); String exportRootDirName = tmpPath; + MmContext mmCtx = MmContext.createIfNeeded(ts == null ? null : ts.tableHandle); + + Utilities.FILE_OP_LOGGER.debug("Exporting table {}: MM context {}", + ts == null ? null : ts.tableName, mmCtx); // Configure export work - ExportWork exportWork = - new ExportWork(exportRootDirName, ts, replicationSpec, ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName); + ExportWork exportWork = new ExportWork(exportRootDirName, ts, replicationSpec, + ErrorMsg.INVALID_PATH.getMsg(ast), acidTableName, mmCtx); // Create an export task and add it as a root task - return TaskFactory.get(exportWork); + return TaskFactory.get(exportWork); + } + + @Override + public boolean hasTransactionalInQuery() { + return isMmExport; // Full ACID export goes thru UpdateDelete analyzer. } } http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 1a3cef9..cc7f0d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -384,7 +384,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, - Long writeId, int stmtId, boolean isSourceMm) { + Long writeId, int stmtId) { assert table != null; assert table.getParameters() != null; Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); @@ -425,9 +425,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if (replicationSpec.isInReplicationScope()) { copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); } else { - CopyWork cw = new CopyWork(dataPath, destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); - copyTask = TaskFactory.get(cw); + copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false)); } LoadTableDesc loadTableWork = new LoadTableDesc( @@ -482,7 +480,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, ImportTableDesc tblDesc, Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, - EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId, boolean isSourceMm) + EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0); if (tblDesc.isExternal() && tblDesc.getLocation() == null) { @@ -519,9 +517,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { copyTask = ReplCopyTask.getLoadCopyTask( replicationSpec, new Path(srcLocation), destPath, x.getConf()); } else { - CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false); - cw.setSkipSourceMmDirs(isSourceMm); - copyTask = TaskFactory.get(cw); + copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false)); } Task<?> addPartTask = TaskFactory.get( @@ -832,8 +828,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws HiveException, IOException, MetaException { - final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); - if (table != null) { if (table.isPartitioned()) { x.getLOG().debug("table partitioned"); @@ -843,7 +837,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); } else { throw new SemanticException( ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec))); @@ -855,8 +849,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Path tgtPath = new Path(table.getDataLocation().toString()); FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x.getLOG()); - loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId, - isSourceMm); + loadTable(fromURI, table, false, tgtPath, replicationSpec, x, writeId, stmtId); } // Set this to read because we can't overwrite any existing partitions x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -875,7 +868,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if (isPartitioned(tblDesc)) { for (AddPartitionDesc addPartitionDesc : partitionDescs) { t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, - replicationSpec, x, writeId, stmtId, isSourceMm)); + replicationSpec, x, writeId, stmtId)); } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); @@ -893,7 +886,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf()); checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x.getLOG()); t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, - writeId, stmtId, isSourceMm)); + writeId, stmtId)); } } x.getTasks().add(t); @@ -930,7 +923,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { throws HiveException, URISyntaxException, IOException, MetaException { Task<?> dropTblTask = null; - final boolean isSourceMm = AcidUtils.isInsertOnlyTable(tblDesc.getTblProps()); WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK; // Normally, on import, trying to create a table or a partition in a db that does not yet exist @@ -1014,14 +1006,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( - addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } } } else { x.getLOG().debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm)); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId)); } } @@ -1066,7 +1058,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if (ptn == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec()); } @@ -1083,7 +1075,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if (replicationSpec.allowReplacementInto(ptn.getParameters())){ if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( - fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); + fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); } else { x.getTasks().add(alterSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); @@ -1109,7 +1101,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into loadTable(fromURI, table, replicationSpec.isReplace(), new Path(tblDesc.getLocation()), - replicationSpec, x, writeId, stmtId, isSourceMm); + replicationSpec, x, writeId, stmtId); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 0f42ab8..088b5cf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -216,7 +216,7 @@ public final class SemanticAnalyzerFactory { case HiveParser.TOK_LOAD: return new LoadSemanticAnalyzer(queryState); case HiveParser.TOK_EXPORT: - if(UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { + if (UpdateDeleteSemanticAnalyzer.isAcidExport(tree)) { return new UpdateDeleteSemanticAnalyzer(queryState); } return new ExportSemanticAnalyzer(queryState); http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 70eb750..d73fc4f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.PartitionIterable; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +44,8 @@ import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths; * it has a blocking queue that stores partitions to be dumped via a producer thread. * it has a worker thread pool that reads of the queue to perform the various tasks. */ +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? class PartitionExport { private final Paths paths; private final PartitionIterable partitionIterable; @@ -50,16 +53,18 @@ class PartitionExport { private final HiveConf hiveConf; private final int nThreads; private final SessionState callersSession; + private final MmContext mmCtx; private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); private BlockingQueue<Partition> queue; PartitionExport(Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser, - HiveConf hiveConf) { + HiveConf hiveConf, MmContext mmCtx) { this.paths = paths; this.partitionIterable = partitionIterable; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; + this.mmCtx = mmCtx; this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM); this.queue = new ArrayBlockingQueue<>(2 * nThreads); this.callersSession = SessionState.get(); @@ -106,7 +111,7 @@ class PartitionExport { List<Path> dataPathList = Utils.getDataPathList(partition.getDataLocation(), forReplicationSpec, hiveConf); Path rootDataDumpDir = paths.partitionExportDir(partitionName); - new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf) + new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx) .export(forReplicationSpec); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index e801a64..20ff23a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +51,8 @@ import java.util.concurrent.ConcurrentHashMap; import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity; +// TODO: this object is created once to call one method and then immediately destroyed. +// So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class TableExport { private static final Logger logger = LoggerFactory.getLogger(TableExport.class); @@ -59,9 +62,10 @@ public class TableExport { private final String distCpDoAsUser; private final HiveConf conf; private final Paths paths; + private final MmContext mmCtx; public TableExport(Paths paths, TableSpec tableSpec, ReplicationSpec replicationSpec, Hive db, - String distCpDoAsUser, HiveConf conf) { + String distCpDoAsUser, HiveConf conf, MmContext mmCtx) { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() && replicationSpec.isInReplicationScope()) @@ -76,6 +80,7 @@ public class TableExport { this.distCpDoAsUser = distCpDoAsUser; this.conf = conf; this.paths = paths; + this.mmCtx = mmCtx; } public boolean write() throws SemanticException { @@ -147,13 +152,13 @@ public class TableExport { throw new IllegalStateException("partitions cannot be null for partitionTable :" + tableSpec.tableName); } - new PartitionExport(paths, partitions, distCpDoAsUser, conf).write(replicationSpec); + new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec); } else { List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(), replicationSpec, conf); // this is the data copy - new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf) + new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx) .export(replicationSpec); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 690498f..b61a945 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -20,24 +20,31 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; +import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.security.auth.login.LoginException; + import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; +//TODO: this object is created once to call one method and then immediately destroyed. +//So it's basically just a roundabout way to pass arguments to a static method. Simplify? public class FileOperations { private static Logger logger = LoggerFactory.getLogger(FileOperations.class); private final List<Path> dataPathList; @@ -45,13 +52,15 @@ public class FileOperations { private final String distCpDoAsUser; private HiveConf hiveConf; private final FileSystem dataFileSystem, exportFileSystem; + private final MmContext mmCtx; - public FileOperations(List<Path> dataPathList, Path exportRootDataDir, - String distCpDoAsUser, HiveConf hiveConf) throws IOException { + public FileOperations(List<Path> dataPathList, Path exportRootDataDir, String distCpDoAsUser, + HiveConf hiveConf, MmContext mmCtx) throws IOException { this.dataPathList = dataPathList; this.exportRootDataDir = exportRootDataDir; this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; + this.mmCtx = mmCtx; if ((dataPathList != null) && !dataPathList.isEmpty()) { dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); } else { @@ -72,17 +81,59 @@ public class FileOperations { * This writes the actual data in the exportRootDataDir from the source. */ private void copyFiles() throws IOException, LoginException { - for (Path dataPath : dataPathList) { - FileStatus[] fileStatuses = - LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataPath); - List<Path> srcPaths = new ArrayList<>(); - for (FileStatus fileStatus : fileStatuses) { - srcPaths.add(fileStatus.getPath()); + if (mmCtx == null) { + for (Path dataPath : dataPathList) { + copyOneDataPath(dataPath, exportRootDataDir); } - new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths); + } else { + copyMmPath(); + } + } + + private void copyOneDataPath(Path fromPath, Path toPath) throws IOException, LoginException { + FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, fromPath); + List<Path> srcPaths = new ArrayList<>(); + for (FileStatus fileStatus : fileStatuses) { + srcPaths.add(fileStatus.getPath()); } + + new CopyUtils(distCpDoAsUser, hiveConf).doCopy(toPath, srcPaths); } + private void copyMmPath() throws LoginException, IOException { + assert dataPathList.size() == 1; + ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList(hiveConf, mmCtx.getFqTableName()); + Path fromPath = dataFileSystem.makeQualified(dataPathList.get(0)); + List<Path> validPaths = getMmValidPaths(ids, fromPath); + String fromPathStr = fromPath.toString(); + if (!fromPathStr.endsWith(Path.SEPARATOR)) { + fromPathStr += Path.SEPARATOR; + } + for (Path validPath : validPaths) { + // Export valid directories with a modified name so they don't look like bases/deltas. + // We could also dump the delta contents all together and rename the files if names collide. + String mmChildPath = "export_old_" + validPath.toString().substring(fromPathStr.length()); + Path destPath = new Path(exportRootDataDir, mmChildPath); + exportFileSystem.mkdirs(destPath); + copyOneDataPath(validPath, destPath); + } + } + + private List<Path> getMmValidPaths(ValidWriteIdList ids, Path fromPath) throws IOException { + Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", fromPath); + AcidUtils.Directory acidState = AcidUtils.getAcidState(fromPath, hiveConf, ids); + List<Path> validPaths = new ArrayList<>(); + Path base = acidState.getBaseDirectory(); + if (base != null) { + validPaths.add(base); + } + for (ParsedDelta pd : acidState.getCurrentDirectories()) { + validPaths.add(pd.getPath()); + } + return validPaths; + } + + /** * This needs the root data directory to which the data needs to be exported to. * The data export here is a list of files either in table/partition that are written to the _files @@ -90,8 +141,19 @@ public class FileOperations { */ private void exportFilesAsList() throws SemanticException, IOException { try (BufferedWriter writer = writer()) { - for (Path dataPath : dataPathList) { - writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + if (mmCtx != null) { + assert dataPathList.size() == 1; + Path dataPath = dataPathList.get(0); + ValidWriteIdList ids = AcidUtils.getTableValidWriteIdList( + hiveConf, mmCtx.getFqTableName()); + List<Path> validPaths = getMmValidPaths(ids, dataPath); + for (Path mmPath : validPaths) { + writeFilesList(listFilesInDir(mmPath), writer, AcidUtils.getAcidSubDir(dataPath)); + } + } else { + for (Path dataPath : dataPathList) { + writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + } } } } http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java index c0e4a43..018983f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CopyWork.java @@ -33,15 +33,10 @@ public class CopyWork implements Serializable { private Path[] fromPath; private Path[] toPath; private boolean errorOnSrcEmpty; - private boolean isSkipMmDirs = false; public CopyWork() { } - public CopyWork(final Path fromPath, final Path toPath) { - this(fromPath, toPath, true); - } - public CopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) { this(new Path[] { fromPath }, new Path[] { toPath }); this.setErrorOnSrcEmpty(errorOnSrcEmpty); @@ -92,17 +87,4 @@ public class CopyWork implements Serializable { public boolean isErrorOnSrcEmpty() { return errorOnSrcEmpty; } - - /** - * Whether the copy should ignore MM directories in the source, and copy their content to - * destination directly, rather than copying the directories themselves. - * */ - public void setSkipSourceMmDirs(boolean isMm) { - this.isSkipMmDirs = isMm; - } - - public boolean doSkipSourceMmDirs() { - return isSkipMmDirs ; - } - } http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java index 72ce798..d91569e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ExportWork.java @@ -17,39 +17,65 @@ */ package org.apache.hadoop.hive.ql.plan; +import java.io.Serializable; + +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; - @Explain(displayName = "Export Work", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) public class ExportWork implements Serializable { - private Logger LOG = LoggerFactory.getLogger(ExportWork.class); + private static Logger LOG = LoggerFactory.getLogger(ExportWork.class); private static final long serialVersionUID = 1L; + public final static class MmContext { + private final String fqTableName; + + private MmContext(String fqTableName) { + this.fqTableName = fqTableName; + } + + @Override + public String toString() { + return "[" + fqTableName + "]"; + } + + public static MmContext createIfNeeded(Table t) { + if (t == null) return null; + if (!AcidUtils.isInsertOnlyTable(t.getParameters())) return null; + return new MmContext(AcidUtils.getFullTableName(t.getDbName(), t.getTableName())); + } + + public String getFqTableName() { + return fqTableName; + } + } + private final String exportRootDirName; private TableSpec tableSpec; private ReplicationSpec replicationSpec; private String astRepresentationForErrorMsg; - private String qualifiedTableName; + private String acidFqTableName; + private final MmContext mmContext; /** - * @param qualifiedTable if exporting Acid table, this is temp table - null otherwise + * @param acidFqTableName if exporting Acid table, this is temp table - null otherwise */ public ExportWork(String exportRootDirName, TableSpec tableSpec, ReplicationSpec replicationSpec, - String astRepresentationForErrorMsg, String qualifiedTable) { + String astRepresentationForErrorMsg, String acidFqTableName, MmContext mmContext) { this.exportRootDirName = exportRootDirName; this.tableSpec = tableSpec; this.replicationSpec = replicationSpec; this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; - this.qualifiedTableName = qualifiedTable; + this.mmContext = mmContext; + this.acidFqTableName = acidFqTableName; } public String getExportRootDir() { @@ -60,24 +86,16 @@ public class ExportWork implements Serializable { return tableSpec; } - public void setTableSpec(TableSpec tableSpec) { - this.tableSpec = tableSpec; - } - public ReplicationSpec getReplicationSpec() { return replicationSpec; } - public void setReplicationSpec(ReplicationSpec replicationSpec) { - this.replicationSpec = replicationSpec; - } - public String getAstRepresentationForErrorMsg() { return astRepresentationForErrorMsg; } - public void setAstRepresentationForErrorMsg(String astRepresentationForErrorMsg) { - this.astRepresentationForErrorMsg = astRepresentationForErrorMsg; + public MmContext getMmContext() { + return mmContext; } /** @@ -88,10 +106,10 @@ public class ExportWork implements Serializable { * for more info. */ public void acidPostProcess(Hive db) throws HiveException { - if(qualifiedTableName != null) { - LOG.info("Swapping export of " + tableSpec.tableName + " to " + qualifiedTableName + + if (acidFqTableName != null) { + LOG.info("Swapping export of " + tableSpec.tableName + " to " + acidFqTableName + " using partSpec=" + tableSpec.partSpec); - tableSpec = new TableSpec(db, qualifiedTableName, tableSpec.partSpec, true); + tableSpec = new TableSpec(db, acidFqTableName, tableSpec.partSpec, true); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java index 6faba42..3e2784b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java @@ -17,9 +17,24 @@ */ package org.apache.hadoop.hive.ql; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.LockState; @@ -47,13 +62,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileOutputStream; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.TimeUnit; - /** * The LockManager is not ready, but for no-concurrency straight-line path we can * test AC=true, and AC=false with commit/rollback/exception and test resulting data. @@ -152,6 +160,106 @@ public class TestTxnCommands extends TxnCommandsBaseForTests { Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1); } + @Test + public void testMmExim() throws Exception { + String tableName = "mm_table", importName = tableName + "_import"; + runStatementOnDriver("drop table if exists " + tableName); + runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " + + "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')", + tableName)); + + // Regular insert: export some MM deltas, then import into a new table. + int[][] rows1 = {{1,2},{3,4}}; + runStatementOnDriver(String.format("insert into %s (a,b) %s", + tableName, makeValuesClause(rows1))); + runStatementOnDriver(String.format("insert into %s (a,b) %s", + tableName, makeValuesClause(rows1))); + IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf); + org.apache.hadoop.hive.metastore.api.Table table = msClient.getTable("default", tableName); + FileSystem fs = FileSystem.get(hiveConf); + Path exportPath = new Path(table.getSd().getLocation() + "_export"); + fs.delete(exportPath, true); + runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath)); + List<String> paths = listPathsRecursive(fs, exportPath); + verifyMmExportPaths(paths, 2); + runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath)); + org.apache.hadoop.hive.metastore.api.Table imported = msClient.getTable("default", importName); + Assert.assertEquals(imported.toString(), "insert_only", + imported.getParameters().get("transactional_properties")); + Path importPath = new Path(imported.getSd().getLocation()); + FileStatus[] stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter); + Assert.assertEquals(Arrays.toString(stat), 1, stat.length); + assertIsDelta(stat[0]); + List<String> allData = stringifyValues(rows1); + allData.addAll(stringifyValues(rows1)); + allData.sort(null); + Collections.sort(allData); + List<String> rs = runStatementOnDriver( + String.format("select a,b from %s order by a,b", importName)); + Assert.assertEquals("After import: " + rs, allData, rs); + runStatementOnDriver("drop table if exists " + importName); + + // Do insert overwrite to create some invalid deltas, and import into a non-MM table. + int[][] rows2 = {{5,6},{7,8}}; + runStatementOnDriver(String.format("insert overwrite table %s %s", + tableName, makeValuesClause(rows2))); + fs.delete(exportPath, true); + runStatementOnDriver(String.format("export table %s to '%s'", tableName, exportPath)); + paths = listPathsRecursive(fs, exportPath); + verifyMmExportPaths(paths, 1); + runStatementOnDriver(String.format("create table %s (a int, b int) stored as orc " + + "TBLPROPERTIES ('transactional'='false')", importName)); + runStatementOnDriver(String.format("import table %s from '%s'", importName, exportPath)); + imported = msClient.getTable("default", importName); + Assert.assertNull(imported.toString(), imported.getParameters().get("transactional")); + Assert.assertNull(imported.toString(), + imported.getParameters().get("transactional_properties")); + importPath = new Path(imported.getSd().getLocation()); + stat = fs.listStatus(importPath, AcidUtils.hiddenFileFilter); + allData = stringifyValues(rows2); + Collections.sort(allData); + rs = runStatementOnDriver(String.format("select a,b from %s order by a,b", importName)); + Assert.assertEquals("After import: " + rs, allData, rs); + runStatementOnDriver("drop table if exists " + importName); + runStatementOnDriver("drop table if exists " + tableName); + msClient.close(); + } + + private void assertIsDelta(FileStatus stat) { + Assert.assertTrue(stat.toString(), + stat.getPath().getName().startsWith(AcidUtils.DELTA_PREFIX)); + } + + private void verifyMmExportPaths(List<String> paths, int deltasOrBases) { + // 1 file, 1 dir for each, for now. Plus export "data" dir. + // This could be changed to a flat file list later. + Assert.assertEquals(paths.toString(), 2 * deltasOrBases + 1, paths.size()); + // No confusing directories in export. + for (String path : paths) { + Assert.assertFalse(path, path.startsWith(AcidUtils.DELTA_PREFIX)); + Assert.assertFalse(path, path.startsWith(AcidUtils.BASE_PREFIX)); + } + } + + private List<String> listPathsRecursive(FileSystem fs, Path path) throws IOException { + List<String> paths = new ArrayList<>(); + LinkedList<Path> queue = new LinkedList<>(); + queue.add(path); + while (!queue.isEmpty()) { + Path next = queue.pollFirst(); + FileStatus[] stats = fs.listStatus(next, AcidUtils.hiddenFileFilter); + for (FileStatus stat : stats) { + Path child = stat.getPath(); + paths.add(child.toString()); + if (stat.isDirectory()) { + queue.add(child); + } + } + } + return paths; + } + + /** * add tests for all transitions - AC=t, AC=t, AC=f, commit (for example) * @throws Exception http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java index 91f2d13..8d3ab77 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java @@ -477,6 +477,7 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/ } private void testMM(boolean existingTable, boolean isSourceMM) throws Exception { HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + hiveConf.setBoolean("mapred.input.dir.recursive", true); int[][] data = {{1,2}, {3, 4}, {5, 6}}; runStatementOnDriver("drop table if exists T"); @@ -500,9 +501,10 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/ //verify that we are indeed doing an Acid write (import) rs = runStatementOnDriver("select INPUT__FILE__NAME from T order by INPUT__FILE__NAME"); Assert.assertEquals(3, rs.size()); - Assert.assertTrue(rs.get(0).endsWith("t/delta_0000001_0000001_0000/000000_0")); - Assert.assertTrue(rs.get(1).endsWith("t/delta_0000001_0000001_0000/000000_0")); - Assert.assertTrue(rs.get(2).endsWith("t/delta_0000001_0000001_0000/000000_0")); + for (String s : rs) { + Assert.assertTrue(s, s.contains("/delta_0000001_0000001_0000/")); + Assert.assertTrue(s, s.endsWith("/000000_0")); + } } private void checkResult(String[][] expectedResult, String query, boolean isVectorized, String msg) throws Exception{ @@ -516,6 +518,7 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/ @Test public void testMMExportAborted() throws Exception { HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + hiveConf.setBoolean("mapred.input.dir.recursive", true); int[][] data = {{1, 2}, {3, 4}, {5, 6}}; int[][] dataAbort = {{10, 2}}; runStatementOnDriver("drop table if exists T"); http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java index 25a63d8..8a55d8c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java @@ -217,7 +217,8 @@ public abstract class TxnCommandsBaseForTests { void checkExpected(List<String> rs, String[][] expected, String msg, Logger LOG, boolean checkFileName) { LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); logResult(LOG, rs); - Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); + Assert.assertEquals(testName.getMethodName() + ": " + msg + "; " + rs, + expected.length, rs.size()); //verify data and layout for(int i = 0; i < expected.length; i++) { Assert.assertTrue("Actual line (data) " + i + " data: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/test/queries/clientpositive/mm_exim.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mm_exim.q b/ql/src/test/queries/clientpositive/mm_exim.q index b0c030d..debcc24 100644 --- a/ql/src/test/queries/clientpositive/mm_exim.q +++ b/ql/src/test/queries/clientpositive/mm_exim.q @@ -57,13 +57,13 @@ drop table import1_mm; drop table import2_mm; import table import2_mm from 'ql/test/data/exports/intermmediate_nonpart'; -desc import2_mm; +desc formatted import2_mm; select * from import2_mm order by key, p; drop table import2_mm; drop table import3_mm; import table import3_mm from 'ql/test/data/exports/intermmediate_part'; -desc import3_mm; +desc formatted import3_mm; select * from import3_mm order by key, p; drop table import3_mm; http://git-wip-us.apache.org/repos/asf/hive/blob/cbe8e619/ql/src/test/results/clientpositive/llap/mm_exim.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/mm_exim.q.out b/ql/src/test/results/clientpositive/llap/mm_exim.q.out index 8a43d0f..8dfc738 100644 --- a/ql/src/test/results/clientpositive/llap/mm_exim.q.out +++ b/ql/src/test/results/clientpositive/llap/mm_exim.q.out @@ -292,14 +292,42 @@ POSTHOOK: type: IMPORT #### A masked pattern was here #### POSTHOOK: Output: database:default POSTHOOK: Output: default@import2_mm -PREHOOK: query: desc import2_mm +PREHOOK: query: desc formatted import2_mm PREHOOK: type: DESCTABLE PREHOOK: Input: default@import2_mm -POSTHOOK: query: desc import2_mm +POSTHOOK: query: desc formatted import2_mm POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@import2_mm +# col_name data_type comment key int p int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + bucketing_version 2 + numFiles 3 + numRows 6 + rawDataSize 37 + totalSize 43 + transactional true + transactional_properties insert_only +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 PREHOOK: query: select * from import2_mm order by key, p PREHOOK: type: QUERY PREHOOK: Input: default@import2_mm @@ -338,18 +366,46 @@ POSTHOOK: Output: default@import3_mm POSTHOOK: Output: default@import3_mm@p=455 POSTHOOK: Output: default@import3_mm@p=456 POSTHOOK: Output: default@import3_mm@p=457 -PREHOOK: query: desc import3_mm +PREHOOK: query: desc formatted import3_mm PREHOOK: type: DESCTABLE PREHOOK: Input: default@import3_mm -POSTHOOK: query: desc import3_mm +POSTHOOK: query: desc formatted import3_mm POSTHOOK: type: DESCTABLE POSTHOOK: Input: default@import3_mm +# col_name data_type comment key int -p int # Partition Information # col_name data_type comment p int + +# Detailed Table Information +Database: default +#### A masked pattern was here #### +Retention: 0 +#### A masked pattern was here #### +Table Type: MANAGED_TABLE +Table Parameters: + bucketing_version 2 + numFiles 3 + numPartitions 3 + numRows 6 + rawDataSize 13 + totalSize 19 + transactional true + transactional_properties insert_only +#### A masked pattern was here #### + +# Storage Information +SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe +InputFormat: org.apache.hadoop.mapred.TextInputFormat +OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat +Compressed: No +Num Buckets: -1 +Bucket Columns: [] +Sort Columns: [] +Storage Desc Params: + serialization.format 1 PREHOOK: query: select * from import3_mm order by key, p PREHOOK: type: QUERY PREHOOK: Input: default@import3_mm
