Repository: hive Updated Branches: refs/heads/master 546fe8775 -> 1bdbdc4dd
HIVE-14270: Write temporary data to HDFS when doing inserts on tables located on S3 (Sergio Pena, reviewed by Ashutosh Chauhan, Lefty Leverenz) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1bdbdc4d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1bdbdc4d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1bdbdc4d Branch: refs/heads/master Commit: 1bdbdc4ddfc4e66ac06a99ce930a87cf52307440 Parents: 546fe87 Author: Sergio Pena <[email protected]> Authored: Wed Jul 27 10:34:08 2016 -0500 Committer: Sergio Pena <[email protected]> Committed: Thu Aug 11 11:12:20 2016 -0500 ---------------------------------------------------------------------- .../hadoop/hive/common/BlobStorageUtils.java | 54 +++++++++++ .../org/apache/hadoop/hive/conf/HiveConf.java | 11 ++- .../hive/common/TestBlobStorageUtils.java | 95 ++++++++++++++++++++ .../java/org/apache/hadoop/hive/ql/Context.java | 20 +++++ .../hive/ql/optimizer/GenMapRedUtils.java | 9 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 9 +- .../apache/hadoop/hive/ql/exec/TestContext.java | 63 +++++++++++++ 7 files changed, 253 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java new file mode 100644 index 0000000..6ca35e2 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; + +import java.util.Collection; + +/** + * Utilities for different blob (object) storage systems + */ +public class BlobStorageUtils { + private static final boolean DISABLE_BLOBSTORAGE_AS_SCRATCHDIR = false; + + public static boolean isBlobStoragePath(final Configuration conf, final Path path) { + return (path == null) ? false : isBlobStorageScheme(conf, path.toUri().getScheme()); + } + + public static boolean isBlobStorageFileSystem(final Configuration conf, final FileSystem fs) { + return (fs == null) ? false : isBlobStorageScheme(conf, fs.getScheme()); + } + + public static boolean isBlobStorageScheme(final Configuration conf, final String scheme) { + Collection<String> supportedBlobStoreSchemes = + conf.getStringCollection(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname); + + return supportedBlobStoreSchemes.contains(scheme); + } + + public static boolean isBlobStorageAsScratchDir(final Configuration conf) { + return conf.getBoolean( + HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, + DISABLE_BLOBSTORAGE_AS_SCRATCHDIR + ); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 70816bd..3e9f6ec 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3084,7 +3084,16 @@ public class HiveConf extends Configuration { HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s", new TimeValidator(TimeUnit.SECONDS), "Timeout for Running Query in seconds. A nonpositive value means infinite. " + - "If the query timeout is also set by thrift API call, the smaller one will be taken."); + "If the query timeout is also set by thrift API call, the smaller one will be taken."), + + /* BLOBSTORE section */ + + HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n", + "Comma-separated list of supported blobstore schemes."), + + HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR("hive.blobstore.use.blobstore.as.scratchdir", false, + "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties)."); + public final String varname; private final String altName; http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java ---------------------------------------------------------------------- diff --git a/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java new file mode 100644 index 0000000..84a0d86 --- /dev/null +++ b/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; + +import static org.apache.hadoop.hive.common.BlobStorageUtils.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +public class TestBlobStorageUtils { + private static final Configuration conf = new Configuration(); + + @Before + public void setUp() { + conf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname, "s3a,swift"); + conf.setBoolean(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, false); + } + + @Test + public void testValidAndInvalidPaths() throws IOException { + // Valid paths + assertTrue(isBlobStoragePath(conf, new Path("s3a://bucket/path"))); + assertTrue(isBlobStoragePath(conf, new Path("swift://bucket/path"))); + + // Invalid paths + assertFalse(isBlobStoragePath(conf, new Path("/tmp/a-path"))); + assertFalse(isBlobStoragePath(conf, new Path("s3fs://tmp/file"))); + assertFalse(isBlobStoragePath(conf, null)); + assertFalse(isBlobStorageFileSystem(conf, null)); + assertFalse(isBlobStoragePath(conf, new Path(URI.create("")))); + } + + @Test + public void testValidAndInvalidFileSystems() { + FileSystem fs = mock(FileSystem.class); + + /* Valid FileSystem schemes */ + + doReturn("s3a").when(fs).getScheme(); + assertTrue(isBlobStorageFileSystem(conf, fs)); + + doReturn("swift").when(fs).getScheme(); + assertTrue(isBlobStorageFileSystem(conf, fs)); + + /* Invalid FileSystem schemes */ + + doReturn("hdfs").when(fs).getScheme(); + assertFalse(isBlobStorageFileSystem(conf, fs)); + + doReturn("").when(fs).getScheme(); + assertFalse(isBlobStorageFileSystem(conf, fs)); + + assertFalse(isBlobStorageFileSystem(conf, null)); + } + + @Test + public void testValidAndInvalidSchemes() { + // Valid schemes + assertTrue(isBlobStorageScheme(conf, "s3a")); + assertTrue(isBlobStorageScheme(conf, "swift")); + + // Invalid schemes + assertFalse(isBlobStorageScheme(conf, "hdfs")); + assertFalse(isBlobStorageScheme(conf, "")); + assertFalse(isBlobStorageScheme(conf, null)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 89893eb..3785b1e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.hooks.WriteEntity; @@ -351,6 +352,25 @@ public class Context { } } + /** + * Create a temporary directory depending of the path specified. + * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS) + * - If path is on HDFS, then create a staging directory inside the path + * + * @param path Path used to verify the Filesystem to use for temporary directory + * @return A path to the new temporary directory + */ + public Path getTempDirForPath(Path path) { + if (BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir(conf)) { + // For better write performance, we use HDFS for temporary data when object store is used. + // Note that the scratch directory configuration variable must use HDFS or any other non-blobstorage system + // to take advantage of this performance. + return getMRTmpPath(); + } else { + return getExtTmpPathRelTo(path); + } + } + private Path getExternalScratchDir(URI extURI) { return getStagingDir(new Path(extURI.getScheme(), extURI.getAuthority(), extURI.getPath()), !explain); } http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 5bd7886..cea99e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -36,6 +36,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -1796,15 +1797,17 @@ public final class GenMapRedUtils { Path dest = null; if (chDir) { - dest = fsOp.getConf().getFinalDirName(); + FileSinkDesc fileSinkDesc = fsOp.getConf(); + dest = fileSinkDesc.getFinalDirName(); // generate the temporary file // it must be on the same file system as the current destination Context baseCtx = parseCtx.getContext(); - Path tmpDir = baseCtx.getExternalTmpPath(dest); + // Create the required temporary file in the HDFS location if the destination + // path of the FileSinkOperator table is a blobstore path. + Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath()); - FileSinkDesc fileSinkDesc = fsOp.getConf(); // Change all the linked file sink descriptors if (fileSinkDesc.isLinkedFileSink()) { for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) { http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index a01a7bd..6758741 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.ObjectPair; @@ -6642,7 +6643,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (isNonNativeTable) { queryTmpdir = dest_path; } else { - queryTmpdir = ctx.getExtTmpPathRelTo(dest_path); + queryTmpdir = ctx.getTempDirForPath(dest_path); } if (dpCtx != null) { // set the root of the temporary path where dynamic partition columns will populate @@ -6759,7 +6760,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri() .getAuthority(), partPath.toUri().getPath()); - queryTmpdir = ctx.getExternalTmpPath(dest_path); + queryTmpdir = ctx.getTempDirForPath(dest_path); table_desc = Utilities.getTableDesc(dest_tab); // Add sorting/bucketing if needed @@ -6807,7 +6808,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { try { Path qPath = FileUtils.makeQualified(dest_path, conf); - queryTmpdir = ctx.getExtTmpPathRelTo(qPath); + queryTmpdir = ctx.getTempDirForPath(qPath); } catch (Exception e) { throw new SemanticException("Error creating temporary folder on: " + dest_path, e); @@ -7015,7 +7016,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString()); if (!destTableIsMaterialization && HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) { - String statsTmpLoc = ctx.getExtTmpPathRelTo(queryTmpdir).toString(); + String statsTmpLoc = ctx.getTempDirForPath(dest_path).toString(); fileSinkDesc.setStatsTmpDir(statsTmpLoc); LOG.debug("Set stats collection dir : " + statsTmpLoc); } http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java new file mode 100644 index 0000000..4a4c240 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +public class TestContext { + private static HiveConf conf = new HiveConf(); + + private Context context; + + @Before + public void setUp() throws IOException { + /* Only called to create session directories used by the Context class */ + SessionState.start(conf); + SessionState.detachSession(); + + context = new Context(conf); + } + + @Test + public void testGetScratchDirectoriesForPaths() throws IOException { + Context spyContext = spy(context); + + // When Object store paths are used, then getMRTmpPatch() is called to get a temporary + // directory on the default scratch diretory location (usually /temp) + Path mrTmpPath = new Path("hdfs://hostname/tmp/scratch"); + doReturn(mrTmpPath).when(spyContext).getMRTmpPath(); + assertEquals(mrTmpPath, spyContext.getTempDirForPath(new Path("s3a://bucket/dir"))); + + // When Non-Object store paths are used, then getExtTmpPathRelTo is called to get a temporary + // directory on the same path passed as a parameter + Path tmpPathRelTo = new Path("hdfs://hostname/user"); + doReturn(tmpPathRelTo).when(spyContext).getExtTmpPathRelTo(any(Path.class)); + assertEquals(tmpPathRelTo, spyContext.getTempDirForPath(new Path("/user"))); + } +}
