Repository: hive Updated Branches: refs/heads/branch-2.0 af4766da3 -> 24fb45df7 refs/heads/master 154850124 -> 85ffd22af
HIVE-13463 : Fix ImportSemanticAnalyzer to allow for different src/dst filesystems (Zach York, reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/85ffd22a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/85ffd22a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/85ffd22a Branch: refs/heads/master Commit: 85ffd22af8ab460f545b137aff0592984aa4b4f7 Parents: 1548501 Author: Sergey Shelukhin <[email protected]> Authored: Tue Apr 26 15:17:57 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Tue Apr 26 15:17:57 2016 -0700 ---------------------------------------------------------------------- .../hive/ql/parse/ImportSemanticAnalyzer.java | 25 ++++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/85ffd22a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index fbf1a3c..500c7ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -318,9 +318,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return tblDesc; } - private Task<?> loadTable(URI fromURI, Table table, boolean replace) { + private Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath) { Path dataPath = new Path(fromURI.toString(), "data"); - Path tmpPath = ctx.getExternalTmpPath(new Path(fromURI)); + Path tmpPath = ctx.getExternalTmpPath(tgtPath); Task<?> copyTask = TaskFactory.get(new CopyWork(dataPath, tmpPath, false), conf); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, @@ -390,7 +390,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition " + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); - Path tmpPath = ctx.getExternalTmpPath(new Path(fromURI)); + Path tgtLocation = new Path(partSpec.getLocation()); + Path tmpPath = ctx.getExternalTmpPath(tgtLocation); Task<?> copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), tmpPath, false), conf); Task<?> addPartTask = TaskFactory.get(new DDLWork(getInputs(), @@ -431,7 +432,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { tgtPath = new Path(tblDesc.getLocation(), Warehouse.makePartPath(partSpec.getPartSpec())); } - checkTargetLocationEmpty(fs, tgtPath, replicationSpec); + FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf); + checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec); partSpec.setLocation(tgtPath.toString()); } @@ -707,8 +709,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } else { LOG.debug("table non-partitioned"); // ensure if destination is not empty only for regular import - checkTargetLocationEmpty(fs, new Path(table.getDataLocation().toString()), replicationSpec); - loadTable(fromURI, table, false); + Path tgtPath = new Path(table.getDataLocation().toString()); + FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf); + checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec); + loadTable(fromURI, table, false, tgtPath); } // Set this to read because we can't overwrite any existing partitions outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK)); @@ -741,8 +745,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } else { tablePath = wh.getTablePath(parentDb, tblDesc.getTableName()); } - checkTargetLocationEmpty(fs, tablePath, replicationSpec); - t.addDependentTask(loadTable(fromURI, table, false)); + FileSystem tgtFs = FileSystem.get(tablePath.toUri(), conf); + checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec); + t.addDependentTask(loadTable(fromURI, table, false, tablePath)); } } rootTasks.add(t); @@ -813,7 +818,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } } else { LOG.debug("adding dependent CopyWork/MoveWork for table"); - t.addDependentTask(loadTable(fromURI, table, true)); + t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()))); } } if (dr == null){ @@ -868,7 +873,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return; // silently return, table is newer than our replacement. } if (!replicationSpec.isMetadataOnly()) { - loadTable(fromURI, table, true); // repl-imports are replace-into + loadTable(fromURI, table, true, new Path(fromURI)); // repl-imports are replace-into } else { rootTasks.add(alterTableTask(tblDesc)); }
