HIVE-13463 : Fix ImportSemanticAnalyzer to allow for different src/dst filesystems (Zach York, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/24fb45df Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/24fb45df Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/24fb45df Branch: refs/heads/branch-2.0 Commit: 24fb45df7d70af85529910cef235b6ee71f22b73 Parents: af4766d Author: Sergey Shelukhin <[email protected]> Authored: Tue Apr 26 15:17:57 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Tue Apr 26 15:18:11 2016 -0700 ---------------------------------------------------------------------- .../hive/ql/parse/ImportSemanticAnalyzer.java | 25 ++++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/24fb45df/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 88c4b95..6080c07 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -317,9 +317,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return tblDesc; } - private Task<?> loadTable(URI fromURI, Table table, boolean replace) { + private Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath) { Path dataPath = new Path(fromURI.toString(), "data"); - Path tmpPath = ctx.getExternalTmpPath(new Path(fromURI)); + Path tmpPath = ctx.getExternalTmpPath(tgtPath); Task<?> copyTask = TaskFactory.get(new CopyWork(dataPath, tmpPath, false), conf); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, @@ -389,7 +389,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); - Path tmpPath = ctx.getExternalTmpPath(new Path(fromURI)); + Path tgtLocation = new Path(partSpec.getLocation()); + Path tmpPath = ctx.getExternalTmpPath(tgtLocation); Task<?> copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), tmpPath, false), conf); Task<?> addPartTask = TaskFactory.get(new DDLWork(getInputs(), @@ -430,7 +431,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { tgtPath = new Path(tblDesc.getLocation(), Warehouse.makePartPath(partSpec.getPartSpec())); } - checkTargetLocationEmpty(fs, tgtPath, replicationSpec); + FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf); + checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec); partSpec.setLocation(tgtPath.toString()); } @@ -706,8 +708,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } else { LOG.debug("table non-partitioned"); // ensure if destination is not empty only for regular import - checkTargetLocationEmpty(fs, new Path(table.getDataLocation().toString()), replicationSpec); - loadTable(fromURI, table, false); + Path tgtPath = new Path(table.getDataLocation().toString()); + FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf); + checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec); + loadTable(fromURI, table, false, tgtPath); } // Set this to read because we can't overwrite any existing partitions outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -740,8 +744,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } else { tablePath = wh.getTablePath(parentDb, tblDesc.getTableName()); } - checkTargetLocationEmpty(fs, tablePath, replicationSpec); - t.addDependentTask(loadTable(fromURI, table, false)); + FileSystem tgtFs = FileSystem.get(tablePath.toUri(), conf); + checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec); + t.addDependentTask(loadTable(fromURI, table, false, tablePath)); } } rootTasks.add(t); @@ -812,7 +817,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } } else { LOG.debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true)); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()))); } } if (dr == null){ @@ -867,7 +872,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return; // silently return, table is newer than our replacement. } if (!replicationSpec.isMetadataOnly()) { - loadTable(fromURI, table, true); // repl-imports are replace-into + loadTable(fromURI, table, true, new Path(fromURI)); // repl-imports are replace-into } else { rootTasks.add(alterTableTask(tblDesc)); }
