Repository: spark Updated Branches: refs/heads/branch-2.4 b6e4aca0b -> d64b35588
[SPARK-25738][SQL] Fix LOAD DATA INPATH for hdfs port ## What changes were proposed in this pull request? LOAD DATA INPATH didn't work if the defaultFS included a port for hdfs. Handling this just requires a small change to use the correct URI constructor. ## How was this patch tested? Added a unit test, ran all tests via jenkins Closes #22733 from squito/SPARK-25738. Authored-by: Imran Rashid <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> (cherry picked from commit fdaa99897ac8755938d031896ae0eefb46ce7107) Signed-off-by: Marcelo Vanzin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d64b3558 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d64b3558 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d64b3558 Branch: refs/heads/branch-2.4 Commit: d64b355886b2dd3486729d51bac1a17abba3f64a Parents: b6e4aca Author: Imran Rashid <[email protected]> Authored: Mon Oct 15 18:34:30 2018 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Mon Oct 15 18:34:47 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/execution/command/tables.scala | 11 +++++++---- .../apache/spark/sql/hive/execution/SQLQuerySuite.scala | 8 ++++++++ 2 files changed, 15 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d64b3558/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 2eca1c4..64831e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -306,7 +306,8 @@ case class LoadDataCommand( val loadPath = { if (isLocal) { val localFS = FileContext.getLocalFSFileContext() - makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), new Path(path)) + LoadDataCommand.makeQualified(FsConstants.LOCAL_FS_URI, localFS.getWorkingDirectory(), + new Path(path)) } else { val loadPath = new Path(path) // Follow Hive's behavior: @@ -323,7 +324,7 @@ case class LoadDataCommand( // by considering the wild card scenario in mind.as per old logic query param is // been considered while creating URI instance and if path contains wild card char '?' // the remaining charecters after '?' will be removed while forming URI instance - makeQualified(defaultFS, uriPath, loadPath) + LoadDataCommand.makeQualified(defaultFS, uriPath, loadPath) } } val fs = loadPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) @@ -363,7 +364,9 @@ case class LoadDataCommand( CommandUtils.updateTableStats(sparkSession, targetTable) Seq.empty[Row] } +} +object LoadDataCommand { /** * Returns a qualified path object. Method ported from org.apache.hadoop.fs.Path class. * @@ -372,7 +375,7 @@ case class LoadDataCommand( * @param path Path instance based on the path string specified by the user. * @return qualified path object */ - private def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { + private[sql] def makeQualified(defaultUri: URI, workingDir: Path, path: Path): Path = { val pathUri = if (path.isAbsolute()) path.toUri() else new Path(workingDir, path).toUri() if (pathUri.getScheme == null || pathUri.getAuthority == null && defaultUri.getAuthority != null) { @@ -383,7 +386,7 @@ case class LoadDataCommand( pathUri.getAuthority } try { - val newUri = new URI(scheme, authority, pathUri.getPath, pathUri.getFragment) + val newUri = new URI(scheme, authority, pathUri.getPath, null, pathUri.getFragment) new Path(newUri) } catch { case e: URISyntaxException => http://git-wip-us.apache.org/repos/asf/spark/blob/d64b3558/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e49aea2..dfcde8c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io.File +import java.net.URI import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.util.{Locale, Set} @@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, Functio import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.command.LoadDataCommand import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} @@ -1985,6 +1987,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("SPARK-25738: defaultFs can have a port") { + val defaultURI = new URI("hdfs://fizz.buzz.com:8020") + val r = LoadDataCommand.makeQualified(defaultURI, new Path("/foo/bar"), new Path("/flim/flam")) + assert(r === new Path("hdfs://fizz.buzz.com:8020/flim/flam")) + } + test("Insert overwrite with partition") { withTable("tableWithPartition") { sql( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
