http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java new file mode 100644 index 0000000..ddcda4c --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreFS.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.MetaException; + +/** + * Define a set of APIs that may vary in different environments + */ +public interface MetaStoreFS { + + /** + * delete a directory + * + * @param f + * @param ifPurge + * @param recursive + * @return true on success + * @throws MetaException + */ + public boolean deleteDir(FileSystem fs, Path f, boolean recursive, + boolean ifPurge, Configuration conf) throws MetaException; + +}
http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java new file mode 100644 index 0000000..26e2c49 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetadataStore.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +public interface MetadataStore { + /** + * @param fileIds file ID list. + * @param result The ref parameter, used to return the serialized file metadata. + */ + void getFileMetadata(List<Long> fileIds, ByteBuffer[] result) throws IOException; + + /** + * @param fileIds file ID list. + * @param metadataBuffers Serialized file metadata, one per file ID. + * @param addedCols The column names for additional columns created by file-format-specific + * metadata handler, to be stored in the cache. + * @param addedVals The values for addedCols; one value per file ID per added column. + */ + void storeFileMetadata(List<Long> fileIds, List<ByteBuffer> metadataBuffers, + ByteBuffer[] addedCols, ByteBuffer[][] addedVals) throws IOException, InterruptedException; + + /** + * @param fileId The file ID. + * @param metadata Serialized file metadata. + * @param addedCols The column names for additional columns created by file-format-specific + * metadata handler, to be stored in the cache. + * @param addedVals The values for addedCols; one value per added column. + */ + void storeFileMetadata(long fileId, ByteBuffer metadata, ByteBuffer[] addedCols, + ByteBuffer[] addedVals) throws IOException, InterruptedException; + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java new file mode 100644 index 0000000..e5d21b0 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +import java.util.concurrent.TimeUnit; + +/** + * Any task that will run as a separate thread in the metastore should implement this + * interface. + */ +public interface MetastoreTaskThread extends Configurable, Runnable { + + /** + * Get the frequency at which the thread should be scheduled in the thread pool. You must call + * {@link #setConf(Configuration)} before calling this method. + * @param unit TimeUnit to express the frequency in. + * @return frequency + */ + long runFrequency(TimeUnit unit); +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java new file mode 100644 index 0000000..105511d --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionExpressionProxy.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; + +/** + * The proxy interface that metastore uses for variety of QL operations (metastore can't depend + * on QL because QL depends on metastore; creating metastore-client module would be a proper way + * to solve this problem). + */ +public interface PartitionExpressionProxy { + + /** + * Converts serialized Hive expression into filter in the format suitable for Filter.g. + * @param expr Serialized expression. + * @return The filter string. + */ + public String convertExprToFilter(byte[] expr) throws MetaException; + + /** + * Filters the partition names via serialized Hive expression. + * @param partColumns Partition columns in the underlying table. + * @param expr Serialized expression. + * @param defaultPartitionName Default partition name from job or server configuration. + * @param partitionNames Partition names; the list is modified in place. + * @return Whether there were any unknown partitions preserved in the name list. + */ + boolean filterPartitionsByExpr(List<FieldSchema> partColumns, + byte[] expr, String defaultPartitionName, List<String> partitionNames) throws MetaException; + + /** + * Determines the file metadata type from input format of the source table or partition. + * @param inputFormat Input format name. + * @return The file metadata type. + */ + FileMetadataExprType getMetadataType(String inputFormat); + + /** + * Gets a separate proxy that can be used to call file-format-specific methods. + * @param type The file metadata type. + * @return The proxy. + */ + FileFormatProxy getFileFormatProxy(FileMetadataExprType type); + + /** + * Creates SARG from serialized representation. + * @param expr SARG, serialized as Kryo. + * @return SARG. + */ + SearchArgument createSarg(byte[] expr); +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java new file mode 100644 index 0000000..893c9f4 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.Trash; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplChangeManager { + private static final Logger LOG = LoggerFactory.getLogger(ReplChangeManager.class); + static private ReplChangeManager instance; + + private static boolean inited = false; + private static boolean enabled = false; + private static Path cmroot; + private static Configuration conf; + private String msUser; + private String msGroup; + + private static final String ORIG_LOC_TAG = "user.original-loc"; + static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; + private static final String URI_FRAGMENT_SEPARATOR = "#"; + public static final String SOURCE_OF_REPLICATION = "repl.source.for"; + private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]"; + + public enum RecycleType { + MOVE, + COPY + } + + public static class FileInfo { + private FileSystem srcFs; + private Path sourcePath; + private Path cmPath; + private String checkSum; + private boolean useSourcePath; + private String subDir; + private boolean copyDone; + + public FileInfo(FileSystem srcFs, Path sourcePath, String subDir) { + this(srcFs, sourcePath, null, null, true, subDir); + } + public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath, + String checkSum, boolean useSourcePath, String subDir) { + this.srcFs = srcFs; + this.sourcePath = sourcePath; + this.cmPath = cmPath; + this.checkSum = checkSum; + this.useSourcePath = useSourcePath; + this.subDir = subDir; + this.copyDone = false; + } + public FileSystem getSrcFs() { + return srcFs; + } + public Path getSourcePath() { + return sourcePath; + } + public Path getCmPath() { + return cmPath; + } + public String getCheckSum() { + return checkSum; + } + public boolean isUseSourcePath() { + return useSourcePath; + } + public void setIsUseSourcePath(boolean useSourcePath) { + this.useSourcePath = useSourcePath; + } + public String getSubDir() { + return subDir; + } + public boolean isCopyDone() { + return copyDone; + } + public void setCopyDone(boolean copyDone) { + this.copyDone = copyDone; + } + public Path getEffectivePath() { + if (useSourcePath) { + return sourcePath; + } else { + return cmPath; + } + } + } + + public static synchronized ReplChangeManager getInstance(Configuration conf) + throws MetaException { + if (instance == null) { + instance = new ReplChangeManager(conf); + } + return instance; + } + + private ReplChangeManager(Configuration conf) throws MetaException { + try { + if (!inited) { + if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) { + ReplChangeManager.enabled = true; + ReplChangeManager.cmroot = new Path(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR)); + ReplChangeManager.conf = conf; + + FileSystem cmFs = cmroot.getFileSystem(conf); + // Create cmroot with permission 700 if not exist + if (!cmFs.exists(cmroot)) { + cmFs.mkdirs(cmroot); + cmFs.setPermission(cmroot, new FsPermission("700")); + } + UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); + msUser = usergroupInfo.getShortUserName(); + msGroup = usergroupInfo.getPrimaryGroupName(); + } + inited = true; + } + } catch (IOException e) { + throw new MetaException(StringUtils.stringifyException(e)); + } + } + + // Filter files starts with ".". Note Hadoop consider files starts with + // "." or "_" as hidden file. However, we need to replicate files starts + // with "_". We find at least 2 use cases: + // 1. For har files, _index and _masterindex is required files + // 2. _success file is required for Oozie to indicate availability of data source + private static final PathFilter hiddenFileFilter = new PathFilter(){ + public boolean accept(Path p){ + return !p.getName().startsWith("."); + } + }; + + /*** + * Move a path into cmroot. If the path is a directory (of a partition, or table if nonpartitioned), + * recursively move files inside directory to cmroot. Note the table must be managed table + * @param path a single file or directory + * @param type if the files to be copied or moved to cmpath. + * Copy is costly but preserve the source file + * @param ifPurge if the file should skip Trash when move/delete source file. + * This is referred only if type is MOVE. + * @return int + * @throws IOException + */ + public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOException { + if (!enabled) { + return 0; + } + + int count = 0; + FileSystem fs = path.getFileSystem(conf); + if (fs.isDirectory(path)) { + FileStatus[] files = fs.listStatus(path, hiddenFileFilter); + for (FileStatus file : files) { + count += recycle(file.getPath(), type, ifPurge); + } + } else { + String fileCheckSum = checksumFor(path, fs); + Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, cmroot.toString()); + + // set timestamp before moving to cmroot, so we can + // avoid race condition CM remove the file before setting + // timestamp + long now = System.currentTimeMillis(); + fs.setTimes(path, now, -1); + + boolean success = false; + if (fs.exists(cmPath) && fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) { + // If already a file with same checksum exists in cmPath, just ignore the copy/move + // Also, mark the operation is unsuccessful to notify that file with same name already + // exist which will ensure the timestamp of cmPath is updated to avoid clean-up by + // CM cleaner. + success = false; + } else { + switch (type) { + case MOVE: { + LOG.info("Moving {} to {}", path.toString(), cmPath.toString()); + + // Rename fails if the file with same name already exist. + success = fs.rename(path, cmPath); + break; + } + case COPY: { + LOG.info("Copying {} to {}", path.toString(), cmPath.toString()); + + // It is possible to have a file with same checksum in cmPath but the content is + // partially copied or corrupted. In this case, just overwrite the existing file with + // new one. + success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf); + break; + } + default: + // Operation fails as invalid input + break; + } + } + + // Ignore if a file with same content already exist in cmroot + // We might want to setXAttr for the new location in the future + if (success) { + // set the file owner to hive (or the id metastore run as) + fs.setOwner(cmPath, msUser, msGroup); + + // tag the original file name so we know where the file comes from + // Note we currently only track the last known trace as + // xattr has limited capacity. We shall revisit and store all original + // locations if orig-loc becomes important + try { + fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for {}", path.toString()); + } + + count++; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("A file with the same content of {} already exists, ignore", path.toString()); + } + // Need to extend the tenancy if we saw a newer file with the same content + fs.setTimes(cmPath, now, -1); + } + + // Tag if we want to remain in trash after deletion. + // If multiple files share the same content, then + // any file claim remain in trash would be granted + if ((type == RecycleType.MOVE) && !ifPurge) { + try { + fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[] { 0 }); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for {}", cmPath.toString()); + } + } + } + return count; + } + + // Get checksum of a file + static public String checksumFor(Path path, FileSystem fs) throws IOException { + // TODO: fs checksum only available on hdfs, need to + // find a solution for other fs (eg, local fs, s3, etc) + String checksumString = null; + FileChecksum checksum = fs.getFileChecksum(path); + if (checksum != null) { + checksumString = StringUtils.byteToHexString( + checksum.getBytes(), 0, checksum.getLength()); + } + return checksumString; + } + + /*** + * Convert a path of file inside a partition or table (if non-partitioned) + * to a deterministic location of cmroot. So user can retrieve the file back + * with the original location plus checksum. + * @param conf Hive configuration + * @param name original filename + * @param checkSum checksum of the file, can be retrieved by {@link #checksumFor(Path, FileSystem)} + * @param cmRootUri CM Root URI. (From remote source if REPL LOAD flow. From local config if recycle.) + * @return Path + */ + static Path getCMPath(Configuration conf, String name, String checkSum, String cmRootUri) { + String newFileName = name + "_" + checkSum; + int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, + DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT); + + if (newFileName.length() > maxLength) { + newFileName = newFileName.substring(0, maxLength-1); + } + + return new Path(cmRootUri, newFileName); + } + + /*** + * Get original file specified by src and chksumString. If the file exists and checksum + * matches, return the file; otherwise, use chksumString to retrieve it from cmroot + * @param src Original file location + * @param checksumString Checksum of the original file + * @param srcCMRootURI CM root URI of the source cluster + * @param subDir Sub directory to which the source file belongs to + * @param conf Hive configuration + * @return Corresponding FileInfo object + */ + public static FileInfo getFileInfo(Path src, String checksumString, String srcCMRootURI, String subDir, + Configuration conf) throws MetaException { + try { + FileSystem srcFs = src.getFileSystem(conf); + if (checksumString == null) { + return new FileInfo(srcFs, src, subDir); + } + + Path cmPath = getCMPath(conf, src.getName(), checksumString, srcCMRootURI); + if (!srcFs.exists(src)) { + return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); + } + + String currentChecksumString; + try { + currentChecksumString = checksumFor(src, srcFs); + } catch (IOException ex) { + // If the file is missing or getting modified, then refer CM path + return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); + } + if ((currentChecksumString == null) || checksumString.equals(currentChecksumString)) { + return new FileInfo(srcFs, src, cmPath, checksumString, true, subDir); + } else { + return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); + } + } catch (IOException e) { + throw new MetaException(StringUtils.stringifyException(e)); + } + } + + /*** + * Concatenate filename, checksum, source cmroot uri and subdirectory with "#" + * @param fileUriStr Filename string + * @param fileChecksum Checksum string + * @param encodedSubDir sub directory path into which this file belongs to. Here encoded means, + * the multiple levels of subdirectories are concatenated with path separator "/" + * @return Concatenated Uri string + */ + // TODO: this needs to be enhanced once change management based filesystem is implemented + // Currently using fileuri#checksum#cmrooturi#subdirs as the format + public static String encodeFileUri(String fileUriStr, String fileChecksum, String encodedSubDir) + throws IOException { + String encodedUri = fileUriStr; + if ((fileChecksum != null) && (cmroot != null)) { + encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum + + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmroot, conf); + } else { + encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR; + } + encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + ((encodedSubDir != null) ? encodedSubDir : ""); + if (LOG.isDebugEnabled()) { + LOG.debug("Encoded URI: " + encodedUri); + } + return encodedUri; + } + + /*** + * Split uri with fragment into file uri, subdirs, checksum and source cmroot uri. + * Currently using fileuri#checksum#cmrooturi#subdirs as the format. + * @param fileURIStr uri with fragment + * @return array of file name, subdirs, checksum and source CM root URI + */ + public static String[] decodeFileUri(String fileURIStr) { + String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR); + String[] result = new String[4]; + result[0] = uriAndFragment[0]; + if ((uriAndFragment.length > 1) && !StringUtils.isEmpty(uriAndFragment[1])) { + result[1] = uriAndFragment[1]; + } + if ((uriAndFragment.length > 2) && !StringUtils.isEmpty(uriAndFragment[2])) { + result[2] = uriAndFragment[2]; + } + if ((uriAndFragment.length > 3) && !StringUtils.isEmpty(uriAndFragment[3])) { + result[3] = uriAndFragment[3]; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Reading Encoded URI: " + result[0] + ":: " + result[1] + ":: " + result[2] + ":: " + result[3]); + } + return result; + } + + public static boolean isCMFileUri(Path fromPath) { + String[] result = decodeFileUri(fromPath.toString()); + return result[1] != null; + } + + /** + * Thread to clear old files of cmroot recursively + */ + static class CMClearer implements Runnable { + private Path cmroot; + private long secRetain; + private Configuration conf; + + CMClearer(String cmrootString, long secRetain, Configuration conf) { + this.cmroot = new Path(cmrootString); + this.secRetain = secRetain; + this.conf = conf; + } + + @Override + public void run() { + try { + LOG.info("CMClearer started"); + + long now = System.currentTimeMillis(); + FileSystem fs = cmroot.getFileSystem(conf); + FileStatus[] files = fs.listStatus(cmroot); + + for (FileStatus file : files) { + long modifiedTime = file.getModificationTime(); + if (now - modifiedTime > secRetain*1000) { + try { + if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { + boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf); + if (succ) { + if (LOG.isDebugEnabled()) { + LOG.debug("Move " + file.toString() + " to trash"); + } + } else { + LOG.warn("Fail to move " + file.toString() + " to trash"); + } + } else { + boolean succ = fs.delete(file.getPath(), false); + if (succ) { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove " + file.toString()); + } + } else { + LOG.warn("Fail to remove " + file.toString()); + } + } + } catch (UnsupportedOperationException e) { + LOG.warn("Error getting xattr for " + file.getPath().toString()); + } + } + } + } catch (IOException e) { + LOG.error("Exception when clearing cmroot:" + StringUtils.stringifyException(e)); + } + } + } + + // Schedule CMClearer thread. Will be invoked by metastore + static void scheduleCMClearer(Configuration conf) { + if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder() + .namingPattern("cmclearer-%d") + .daemon(true) + .build()); + executor.scheduleAtFixedRate(new CMClearer(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR), + MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), + 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); + } + } + + public static boolean isSourceOfReplication(Database db) { + assert (db != null); + String replPolicyIds = getReplPolicyIdString(db); + return !StringUtils.isEmpty(replPolicyIds); + } + + public static String getReplPolicyIdString(Database db) { + if (db != null) { + Map<String, String> m = db.getParameters(); + if ((m != null) && (m.containsKey(SOURCE_OF_REPLICATION))) { + String replPolicyId = m.get(SOURCE_OF_REPLICATION); + LOG.debug("repl policy for database {} is {}", db.getName(), replPolicyId); + return replPolicyId; + } + LOG.debug("Repl policy is not set for database ", db.getName()); + } + return null; + } + + public static String joinWithSeparator(Iterable<?> strings) { + return org.apache.hadoop.util.StringUtils.join(TXN_WRITE_EVENT_FILE_SEPARATOR, strings); + } + + public static String[] getListFromSeparatedString(String commaSeparatedString) { + return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*"); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java new file mode 100644 index 0000000..f97f638 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.classification.RetrySemantics; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.metastore.annotation.NoReconnect; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.transport.TTransportException; + +import com.google.common.annotations.VisibleForTesting; + +/** + * RetryingMetaStoreClient. Creates a proxy for a IMetaStoreClient + * implementation and retries calls to it on failure. + * If the login user is authenticated using keytab, it relogins user before + * each call. + * + */ [email protected] +public class RetryingMetaStoreClient implements InvocationHandler { + + private static final Logger LOG = LoggerFactory.getLogger(RetryingMetaStoreClient.class.getName()); + + private final IMetaStoreClient base; + private final UserGroupInformation ugi; + private final int retryLimit; + private final long retryDelaySeconds; + private final ConcurrentHashMap<String, Long> metaCallTimeMap; + private final long connectionLifeTimeInMillis; + private long lastConnectionTime; + private boolean localMetaStore; + + + protected RetryingMetaStoreClient(Configuration conf, Class<?>[] constructorArgTypes, + Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, + Class<? extends IMetaStoreClient> msClientClass) throws MetaException { + + this.ugi = getUGI(); + + if (this.ugi == null) { + LOG.warn("RetryingMetaStoreClient unable to determine current user UGI."); + } + + this.retryLimit = MetastoreConf.getIntVar(conf, ConfVars.THRIFT_FAILURE_RETRIES); + this.retryDelaySeconds = MetastoreConf.getTimeVar(conf, + ConfVars.CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); + this.metaCallTimeMap = metaCallTimeMap; + this.connectionLifeTimeInMillis = MetastoreConf.getTimeVar(conf, + ConfVars.CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS); + this.lastConnectionTime = System.currentTimeMillis(); + String msUri = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS); + localMetaStore = (msUri == null) || msUri.trim().isEmpty(); + + reloginExpiringKeytabUser(); + + this.base = JavaUtils.newInstance(msClientClass, constructorArgTypes, constructorArgs); + + LOG.info("RetryingMetaStoreClient proxy=" + msClientClass + " ugi=" + this.ugi + + " retries=" + this.retryLimit + " delay=" + this.retryDelaySeconds + + " lifetime=" + this.connectionLifeTimeInMillis); + } + + public static IMetaStoreClient getProxy( + Configuration hiveConf, boolean allowEmbedded) throws MetaException { + return getProxy(hiveConf, new Class[]{Configuration.class, HiveMetaHookLoader.class, Boolean.class}, + new Object[]{hiveConf, null, allowEmbedded}, null, HiveMetaStoreClient.class.getName() + ); + } + + @VisibleForTesting + public static IMetaStoreClient getProxy(Configuration hiveConf, HiveMetaHookLoader hookLoader, + String mscClassName) throws MetaException { + return getProxy(hiveConf, hookLoader, null, mscClassName, true); + } + + public static IMetaStoreClient getProxy(Configuration hiveConf, HiveMetaHookLoader hookLoader, + ConcurrentHashMap<String, Long> metaCallTimeMap, String mscClassName, boolean allowEmbedded) + throws MetaException { + + return getProxy(hiveConf, + new Class[] {Configuration.class, HiveMetaHookLoader.class, Boolean.class}, + new Object[] {hiveConf, hookLoader, allowEmbedded}, + metaCallTimeMap, + mscClassName + ); + } + + /** + * This constructor is meant for Hive internal use only. + * Please use getProxy(HiveConf conf, HiveMetaHookLoader hookLoader) for external purpose. + */ + public static IMetaStoreClient getProxy(Configuration hiveConf, Class<?>[] constructorArgTypes, + Object[] constructorArgs, String mscClassName) throws MetaException { + return getProxy(hiveConf, constructorArgTypes, constructorArgs, null, mscClassName); + } + + /** + * This constructor is meant for Hive internal use only. + * Please use getProxy(HiveConf conf, HiveMetaHookLoader hookLoader) for external purpose. + */ + public static IMetaStoreClient getProxy(Configuration hiveConf, Class<?>[] constructorArgTypes, + Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, + String mscClassName) throws MetaException { + + @SuppressWarnings("unchecked") + Class<? extends IMetaStoreClient> baseClass = + JavaUtils.getClass(mscClassName, IMetaStoreClient.class); + + RetryingMetaStoreClient handler = + new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs, + metaCallTimeMap, baseClass); + return (IMetaStoreClient) Proxy.newProxyInstance( + RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Object ret; + int retriesMade = 0; + TException caughtException; + + boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class); + boolean allowRetry = true; + Annotation[] directives = method.getDeclaredAnnotations(); + if(directives != null) { + for(Annotation a : directives) { + if(a instanceof RetrySemantics.CannotRetry) { + allowRetry = false; + } + } + } + + while (true) { + try { + reloginExpiringKeytabUser(); + + if (allowReconnect) { + if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { + if (this.ugi != null) { + // Perform reconnect with the proper user context + try { + LOG.info("RetryingMetaStoreClient trying reconnect as " + this.ugi); + + this.ugi.doAs( + new PrivilegedExceptionAction<Object> () { + @Override + public Object run() throws MetaException { + base.reconnect(); + return null; + } + }); + } catch (UndeclaredThrowableException e) { + Throwable te = e.getCause(); + if (te instanceof PrivilegedActionException) { + throw te.getCause(); + } else { + throw te; + } + } + lastConnectionTime = System.currentTimeMillis(); + } else { + LOG.warn("RetryingMetaStoreClient unable to reconnect. No UGI information."); + throw new MetaException("UGI information unavailable. Will not attempt a reconnect."); + } + } + } + + if (metaCallTimeMap == null) { + ret = method.invoke(base, args); + } else { + // need to capture the timing + long startTime = System.currentTimeMillis(); + ret = method.invoke(base, args); + long timeTaken = System.currentTimeMillis() - startTime; + addMethodTime(method, timeTaken); + } + break; + } catch (UndeclaredThrowableException e) { + throw e.getCause(); + } catch (InvocationTargetException e) { + Throwable t = e.getCause(); + if (t instanceof TApplicationException) { + TApplicationException tae = (TApplicationException)t; + switch (tae.getType()) { + case TApplicationException.UNSUPPORTED_CLIENT_TYPE: + case TApplicationException.UNKNOWN_METHOD: + case TApplicationException.WRONG_METHOD_NAME: + case TApplicationException.INVALID_PROTOCOL: + throw t; + default: + // TODO: most other options are probably unrecoverable... throw? + caughtException = tae; + } + } else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) { + // TODO: most protocol exceptions are probably unrecoverable... throw? + caughtException = (TException)t; + } else if ((t instanceof MetaException) && t.getMessage().matches( + "(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") && + !t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { + caughtException = (MetaException)t; + } else { + throw t; + } + } catch (MetaException e) { + if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") && + !e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { + caughtException = e; + } else { + throw e; + } + } + + + if (retriesMade >= retryLimit || base.isLocalMetaStore() || !allowRetry) { + throw caughtException; + } + retriesMade++; + LOG.warn("MetaStoreClient lost connection. Attempting to reconnect (" + retriesMade + " of " + + retryLimit + ") after " + retryDelaySeconds + "s. " + method.getName(), caughtException); + Thread.sleep(retryDelaySeconds * 1000); + } + return ret; + } + + /** + * Returns the UGI for the current user. + * @return the UGI for the current user. + */ + private UserGroupInformation getUGI() { + UserGroupInformation ugi = null; + + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + // Swallow the exception and let the call determine what to do. + } + + return ugi; + } + + private void addMethodTime(Method method, long timeTaken) { + String methodStr = getMethodString(method); + while (true) { + Long curTime = metaCallTimeMap.get(methodStr), newTime = timeTaken; + if (curTime != null && metaCallTimeMap.replace(methodStr, curTime, newTime + curTime)) break; + if (curTime == null && (null == metaCallTimeMap.putIfAbsent(methodStr, newTime))) break; + } + } + + /** + * @param method + * @return String representation with arg types. eg getDatabase_(String, ) + */ + private String getMethodString(Method method) { + StringBuilder methodSb = new StringBuilder(method.getName()); + methodSb.append("_("); + for (Class<?> paramClass : method.getParameterTypes()) { + methodSb.append(paramClass.getSimpleName()); + methodSb.append(", "); + } + methodSb.append(")"); + return methodSb.toString(); + } + + private boolean hasConnectionLifeTimeReached(Method method) { + if (connectionLifeTimeInMillis <= 0 || localMetaStore) { + return false; + } + + boolean shouldReconnect = + (System.currentTimeMillis() - lastConnectionTime) >= connectionLifeTimeInMillis; + if (LOG.isDebugEnabled()) { + LOG.debug("Reconnection status for Method: " + method.getName() + " is " + shouldReconnect); + } + return shouldReconnect; + } + + /** + * Relogin if login user is logged in using keytab + * Relogin is actually done by ugi code only if sufficient time has passed + * A no-op if kerberos security is not enabled + * @throws MetaException + */ + private void reloginExpiringKeytabUser() throws MetaException { + if(!UserGroupInformation.isSecurityEnabled()){ + return; + } + try { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + //checkTGT calls ugi.relogin only after checking if it is close to tgt expiry + //hadoop relogin is actually done only every x minutes (x=10 in hadoop 1.x) + if(ugi.isFromKeytab()){ + ugi.checkTGTAndReloginFromKeytab(); + } + } catch (IOException e) { + String msg = "Error doing relogin using keytab " + e.getMessage(); + LOG.error(msg, e); + throw new MetaException(msg); + } + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java new file mode 100644 index 0000000..1a17fe3 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/TableIterable.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; + +/** + * Use this to get Table objects for a table list. It provides an iterator to + * on the resulting Table objects. It batches the calls to + * IMetaStoreClient.getTableObjectsByName to avoid OOM issues in HS2 (with + * embedded metastore) or MetaStore server (if HS2 is using remote metastore). + * + */ +public class TableIterable implements Iterable<Table> { + + @Override + public Iterator<Table> iterator() { + return new Iterator<Table>() { + + private final Iterator<String> tableNamesIter = tableNames.iterator(); + private Iterator<org.apache.hadoop.hive.metastore.api.Table> batchIter = null; + + @Override + public boolean hasNext() { + return ((batchIter != null) && batchIter.hasNext()) || tableNamesIter.hasNext(); + } + + @Override + public Table next() { + if ((batchIter == null) || !batchIter.hasNext()) { + getNextBatch(); + } + return batchIter.next(); + } + + private void getNextBatch() { + // get next batch of table names in this list + List<String> nameBatch = new ArrayList<String>(); + int batchCounter = 0; + while (batchCounter < batchSize && tableNamesIter.hasNext()) { + nameBatch.add(tableNamesIter.next()); + batchCounter++; + } + // get the Table objects for this batch of table names and get iterator + // on it + + try { + if (catName != null) { + batchIter = msc.getTableObjectsByName(catName, dbname, nameBatch).iterator(); + } else { + batchIter = msc.getTableObjectsByName(dbname, nameBatch).iterator(); + } + } catch (TException e) { + throw new RuntimeException(e); + } + + } + + @Override + public void remove() { + throw new IllegalStateException( + "TableIterable is a read-only iterable and remove() is unsupported"); + } + }; + } + + private final IMetaStoreClient msc; + private final String dbname; + private final List<String> tableNames; + private final int batchSize; + private final String catName; + + /** + * Primary constructor that fetches all tables in a given msc, given a Hive + * object,a db name and a table name list. + */ + public TableIterable(IMetaStoreClient msc, String dbname, List<String> tableNames, int batchSize) + throws TException { + this.msc = msc; + this.catName = null; + this.dbname = dbname; + this.tableNames = tableNames; + this.batchSize = batchSize; + } + + public TableIterable(IMetaStoreClient msc, String catName, String dbname, List<String> + tableNames, int batchSize) throws TException { + this.msc = msc; + this.catName = catName; + this.dbname = dbname; + this.tableNames = tableNames; + this.batchSize = batchSize; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/35f86c74/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java new file mode 100755 index 0000000..294dfb7 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java @@ -0,0 +1,759 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.metastore.api.Catalog; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.hadoop.hive.metastore.utils.HdfsUtils; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * This class represents a warehouse where data of Hive tables is stored + */ +public class Warehouse { + public static final String DEFAULT_CATALOG_NAME = "hive"; + public static final String DEFAULT_CATALOG_COMMENT = "Default catalog, for Hive"; + public static final String DEFAULT_DATABASE_NAME = "default"; + public static final String DEFAULT_DATABASE_COMMENT = "Default Hive database"; + public static final String DEFAULT_SERIALIZATION_FORMAT = "1"; + public static final String DATABASE_WAREHOUSE_SUFFIX = ".db"; + private static final String CAT_DB_TABLE_SEPARATOR = "."; + + private Path whRoot; + private Path whRootExternal; + private final Configuration conf; + private final String whRootString; + private final String whRootExternalString; + + public static final Logger LOG = LoggerFactory.getLogger("hive.metastore.warehouse"); + + private MetaStoreFS fsHandler = null; + private boolean storageAuthCheck = false; + private ReplChangeManager cm = null; + + public Warehouse(Configuration conf) throws MetaException { + this.conf = conf; + whRootString = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE); + if (StringUtils.isBlank(whRootString)) { + throw new MetaException(ConfVars.WAREHOUSE.getVarname() + + " is not set in the config or blank"); + } + whRootExternalString = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE_EXTERNAL); + fsHandler = getMetaStoreFsHandler(conf); + cm = ReplChangeManager.getInstance(conf); + storageAuthCheck = MetastoreConf.getBoolVar(conf, ConfVars.AUTHORIZATION_STORAGE_AUTH_CHECKS); + } + + private MetaStoreFS getMetaStoreFsHandler(Configuration conf) + throws MetaException { + String handlerClassStr = MetastoreConf.getVar(conf, ConfVars.FS_HANDLER_CLS); + try { + Class<? extends MetaStoreFS> handlerClass = (Class<? extends MetaStoreFS>) Class + .forName(handlerClassStr, true, JavaUtils.getClassLoader()); + MetaStoreFS handler = ReflectionUtils.newInstance(handlerClass, conf); + return handler; + } catch (ClassNotFoundException e) { + throw new MetaException("Error in loading MetaStoreFS handler." + + e.getMessage()); + } + } + + + /** + * Helper functions to convert IOException to MetaException + */ + public static FileSystem getFs(Path f, Configuration conf) throws MetaException { + try { + return f.getFileSystem(conf); + } catch (IOException e) { + MetaStoreUtils.logAndThrowMetaException(e); + } + return null; + } + + public FileSystem getFs(Path f) throws MetaException { + return getFs(f, conf); + } + + + /** + * Hadoop File System reverse lookups paths with raw ip addresses The File + * System URI always contains the canonical DNS name of the Namenode. + * Subsequently, operations on paths with raw ip addresses cause an exception + * since they don't match the file system URI. + * + * This routine solves this problem by replacing the scheme and authority of a + * path with the scheme and authority of the FileSystem that it maps to. + * + * @param path + * Path to be canonicalized + * @return Path with canonical scheme and authority + */ + public static Path getDnsPath(Path path, Configuration conf) throws MetaException { + FileSystem fs = getFs(path, conf); + String uriPath = path.toUri().getPath(); + if (StringUtils.isEmpty(uriPath)) { + uriPath = "/"; + } + return (new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), uriPath)); + } + + public Path getDnsPath(Path path) throws MetaException { + return getDnsPath(path, conf); + } + + /** + * Resolve the configured warehouse root dir with respect to the configuration + * This involves opening the FileSystem corresponding to the warehouse root + * dir (but that should be ok given that this is only called during DDL + * statements for non-external tables). + */ + public Path getWhRoot() throws MetaException { + if (whRoot != null) { + return whRoot; + } + whRoot = getDnsPath(new Path(whRootString)); + return whRoot; + } + + public Path getWhRootExternal() throws MetaException { + if (whRootExternal != null) { + return whRootExternal; + } + if (!hasExternalWarehouseRoot()) { + whRootExternal = getWhRoot(); + } else { + whRootExternal = getDnsPath(new Path(whRootExternalString)); + } + return whRootExternal; + } + + /** + * Build the database path based on catalog name and database name. This should only be used + * when a database is being created or altered. If you just want to find out the path a + * database is already using call {@link #getDatabasePath(Database)}. If the passed in + * database already has a path set that will be used. If not the location will be built using + * catalog's path and the database name. + * @param cat catalog the database is in + * @param db database object + * @return Path representing the directory for the database + * @throws MetaException when the file path cannot be properly determined from the configured + * file system. + */ + public Path determineDatabasePath(Catalog cat, Database db) throws MetaException { + if (db.isSetLocationUri()) { + return getDnsPath(new Path(db.getLocationUri())); + } + if (cat == null || cat.getName().equalsIgnoreCase(DEFAULT_CATALOG_NAME)) { + if (db.getName().equalsIgnoreCase(DEFAULT_DATABASE_NAME)) { + return getWhRoot(); + } else { + return new Path(getWhRoot(), dbDirFromDbName(db)); + } + } else { + return new Path(getDnsPath(new Path(cat.getLocationUri())), dbDirFromDbName(db)); + } + } + + private String dbDirFromDbName(Database db) throws MetaException { + return db.getName().toLowerCase() + DATABASE_WAREHOUSE_SUFFIX; + } + + /** + * Get the path specified by the database. In the case of the default database the root of the + * warehouse is returned. + * @param db database to get the path of + * @return path to the database directory + * @throws MetaException when the file path cannot be properly determined from the configured + * file system. + */ + public Path getDatabasePath(Database db) throws MetaException { + if (db.getCatalogName().equalsIgnoreCase(DEFAULT_CATALOG_NAME) && + db.getName().equalsIgnoreCase(DEFAULT_DATABASE_NAME)) { + return getWhRoot(); + } + return new Path(db.getLocationUri()); + } + + public Path getDefaultDatabasePath(String dbName) throws MetaException { + // TODO CAT - I am fairly certain that most calls to this are in error. This should only be + // used when the database location is unset, which should never happen except when a + // new database is being created. Once I have confirmation of this change calls of this to + // getDatabasePath(), since it does the right thing. Also, merge this with + // determineDatabasePath() as it duplicates much of the logic. + if (dbName.equalsIgnoreCase(DEFAULT_DATABASE_NAME)) { + return getWhRoot(); + } + return new Path(getWhRoot(), dbName.toLowerCase() + DATABASE_WAREHOUSE_SUFFIX); + } + + public Path getDefaultExternalDatabasePath(String dbName) throws MetaException { + if (dbName.equalsIgnoreCase(DEFAULT_DATABASE_NAME)) { + return getWhRootExternal(); + } + return new Path(getWhRootExternal(), dbName.toLowerCase() + DATABASE_WAREHOUSE_SUFFIX); + } + + private boolean hasExternalWarehouseRoot() { + return !StringUtils.isBlank(whRootExternalString); + } + + /** + * Returns the default location of the table path using the parent database's location + * @param db Database where the table is created + * @param tableName table name + * @return + * @throws MetaException + */ + @Deprecated + public Path getDefaultTablePath(Database db, String tableName) + throws MetaException { + return getDefaultTablePath(db, tableName, false); + } + + public Path getDefaultTablePath(Database db, String tableName, boolean isExternal) throws MetaException { + Path dbPath = null; + if (isExternal && hasExternalWarehouseRoot()) { + dbPath = getDefaultExternalDatabasePath(db.getName()); + } else { + dbPath = getDatabasePath(db); + } + return getDnsPath( + new Path(dbPath, MetaStoreUtils.encodeTableName(tableName.toLowerCase()))); + } + + // A few situations where we need the default table path, without a DB object + public Path getDefaultTablePath(String dbName, String tableName, boolean isExternal) throws MetaException { + Path dbPath = null; + if (isExternal && hasExternalWarehouseRoot()) { + dbPath = getDefaultExternalDatabasePath(dbName); + } else { + dbPath = getDefaultDatabasePath(dbName); + } + return getDnsPath( + new Path(dbPath, MetaStoreUtils.encodeTableName(tableName.toLowerCase()))); + } + + public Path getDefaultTablePath(Database db, Table table) throws MetaException { + return getDefaultTablePath(db, table.getTableName(), MetaStoreUtils.isExternalTable(table)); + } + + @Deprecated // Use TableName + public static String getQualifiedName(Table table) { + return TableName.getDbTable(table.getDbName(), table.getTableName()); + } + + @Deprecated // Use TableName + public static String getQualifiedName(String dbName, String tableName) { + return TableName.getDbTable(dbName, tableName); + } + + public static String getQualifiedName(Partition partition) { + return partition.getDbName() + "." + partition.getTableName() + partition.getValues(); + } + + /** + * Get table name in cat.db.table format. + * @param table table object + * @return fully qualified name. + */ + public static String getCatalogQualifiedTableName(Table table) { + return TableName.getQualified(table.getCatName(), table.getDbName(), table.getTableName()); + } + + public boolean mkdirs(Path f) throws MetaException { + FileSystem fs; + try { + fs = getFs(f); + return FileUtils.mkdir(fs, f); + } catch (IOException e) { + MetaStoreUtils.logAndThrowMetaException(e); + } + return false; + } + + public boolean renameDir(Path sourcePath, Path destPath, boolean needCmRecycle) throws MetaException { + try { + if (needCmRecycle) { + // Copy the source files to cmroot. As the client will move the source files to another + // location, we should make a copy of the files to cmroot instead of moving it. + cm.recycle(sourcePath, RecycleType.COPY, true); + } + FileSystem srcFs = getFs(sourcePath); + FileSystem destFs = getFs(destPath); + return FileUtils.rename(srcFs, destFs, sourcePath, destPath); + } catch (Exception ex) { + MetaStoreUtils.logAndThrowMetaException(ex); + } + return false; + } + + void addToChangeManagement(Path file) throws MetaException { + try { + cm.recycle(file, RecycleType.COPY, true); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + } + + public boolean deleteDir(Path f, boolean recursive, Database db) throws MetaException { + return deleteDir(f, recursive, false, db); + } + + public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, Database db) throws MetaException { + return deleteDir(f, recursive, ifPurge, ReplChangeManager.isSourceOfReplication(db)); + } + + public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean needCmRecycle) throws MetaException { + if (needCmRecycle) { + try { + cm.recycle(f, RecycleType.MOVE, ifPurge); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + } + FileSystem fs = getFs(f); + return fsHandler.deleteDir(fs, f, recursive, ifPurge, conf); + } + + public void recycleDirToCmPath(Path f, boolean ifPurge) throws MetaException { + try { + cm.recycle(f, RecycleType.MOVE, ifPurge); + } catch (IOException e) { + throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + } + + public boolean isEmpty(Path path) throws IOException, MetaException { + ContentSummary contents = getFs(path).getContentSummary(path); + if (contents != null && contents.getFileCount() == 0 && contents.getDirectoryCount() == 1) { + return true; + } + return false; + } + + public boolean isWritable(Path path) throws IOException { + if (!storageAuthCheck) { + // no checks for non-secure hadoop installations + return true; + } + if (path == null) { //what??!! + return false; + } + final FileStatus stat; + final FileSystem fs; + try { + fs = getFs(path); + stat = fs.getFileStatus(path); + HdfsUtils.checkFileAccess(fs, stat, FsAction.WRITE); + return true; + } catch (FileNotFoundException fnfe){ + // File named by path doesn't exist; nothing to validate. + return true; + } catch (Exception e) { + // all other exceptions are considered as emanating from + // unauthorized accesses + if (LOG.isDebugEnabled()) { + LOG.debug("Exception when checking if path (" + path + ")", e); + } + return false; + } + } + + private static String escapePathName(String path) { + return FileUtils.escapePathName(path); + } + + private static String unescapePathName(String path) { + return FileUtils.unescapePathName(path); + } + + /** + * Given a partition specification, return the path corresponding to the + * partition spec. By default, the specification does not include dynamic partitions. + * @param spec + * @return string representation of the partition specification. + * @throws MetaException + */ + public static String makePartPath(Map<String, String> spec) + throws MetaException { + return makePartName(spec, true); + } + + /** + * Makes a partition name from a specification + * @param spec + * @param addTrailingSeperator if true, adds a trailing separator e.g. 'ds=1/' + * @return partition name + * @throws MetaException + */ + public static String makePartName(Map<String, String> spec, + boolean addTrailingSeperator) + throws MetaException { + StringBuilder suffixBuf = new StringBuilder(); + int i = 0; + for (Entry<String, String> e : spec.entrySet()) { + if (e.getValue() == null || e.getValue().length() == 0) { + throw new MetaException("Partition spec is incorrect. " + spec); + } + if (i>0) { + suffixBuf.append(Path.SEPARATOR); + } + suffixBuf.append(escapePathName(e.getKey())); + suffixBuf.append('='); + suffixBuf.append(escapePathName(e.getValue())); + i++; + } + if (addTrailingSeperator) { + suffixBuf.append(Path.SEPARATOR); + } + return suffixBuf.toString(); + } + /** + * Given a dynamic partition specification, return the path corresponding to the + * static part of partition specification. This is basically a copy of makePartName + * but we get rid of MetaException since it is not serializable. + * @param spec + * @return string representation of the static part of the partition specification. + */ + public static String makeDynamicPartName(Map<String, String> spec) { + StringBuilder suffixBuf = new StringBuilder(); + for (Entry<String, String> e : spec.entrySet()) { + if (e.getValue() != null && e.getValue().length() > 0) { + suffixBuf.append(escapePathName(e.getKey())); + suffixBuf.append('='); + suffixBuf.append(escapePathName(e.getValue())); + suffixBuf.append(Path.SEPARATOR); + } else { // stop once we see a dynamic partition + break; + } + } + return suffixBuf.toString(); + } + + static final Pattern pat = Pattern.compile("([^/]+)=([^/]+)"); + + private static final Pattern slash = Pattern.compile("/"); + + /** + * Extracts values from partition name without the column names. + * @param name Partition name. + * @param result The result. Must be pre-sized to the expected number of columns. + */ + public static AbstractList<String> makeValsFromName( + String name, AbstractList<String> result) throws MetaException { + assert name != null; + String[] parts = slash.split(name, 0); + if (result == null) { + result = new ArrayList<>(parts.length); + for (int i = 0; i < parts.length; ++i) { + result.add(null); + } + } else if (parts.length != result.size()) { + throw new MetaException( + "Expected " + result.size() + " components, got " + parts.length + " (" + name + ")"); + } + for (int i = 0; i < parts.length; ++i) { + int eq = parts[i].indexOf('='); + if (eq <= 0) { + throw new MetaException("Unexpected component " + parts[i]); + } + result.set(i, unescapePathName(parts[i].substring(eq + 1))); + } + return result; + } + + public static LinkedHashMap<String, String> makeSpecFromName(String name) + throws MetaException { + if (name == null || name.isEmpty()) { + throw new MetaException("Partition name is invalid. " + name); + } + LinkedHashMap<String, String> partSpec = new LinkedHashMap<>(); + makeSpecFromName(partSpec, new Path(name), null); + return partSpec; + } + + public static boolean makeSpecFromName(Map<String, String> partSpec, Path currPath, + Set<String> requiredKeys) { + List<String[]> kvs = new ArrayList<>(); + do { + String component = currPath.getName(); + Matcher m = pat.matcher(component); + if (m.matches()) { + String k = unescapePathName(m.group(1)); + String v = unescapePathName(m.group(2)); + String[] kv = new String[2]; + kv[0] = k; + kv[1] = v; + kvs.add(kv); + } + currPath = currPath.getParent(); + } while (currPath != null && !currPath.getName().isEmpty()); + + // reverse the list since we checked the part from leaf dir to table's base dir + for (int i = kvs.size(); i > 0; i--) { + String key = kvs.get(i - 1)[0]; + if (requiredKeys != null) { + requiredKeys.remove(key); + } + partSpec.put(key, kvs.get(i - 1)[1]); + } + if (requiredKeys == null || requiredKeys.isEmpty()) return true; + LOG.warn("Cannot create partition spec from " + currPath + "; missing keys " + requiredKeys); + return false; + } + + public static Map<String, String> makeEscSpecFromName(String name) throws MetaException { + + if (name == null || name.isEmpty()) { + throw new MetaException("Partition name is invalid. " + name); + } + LinkedHashMap<String, String> partSpec = new LinkedHashMap<>(); + + Path currPath = new Path(name); + + List<String[]> kvs = new ArrayList<>(); + do { + String component = currPath.getName(); + Matcher m = pat.matcher(component); + if (m.matches()) { + String k = m.group(1); + String v = m.group(2); + String[] kv = new String[2]; + kv[0] = k; + kv[1] = v; + kvs.add(kv); + } + currPath = currPath.getParent(); + } while (currPath != null && !currPath.getName().isEmpty()); + + // reverse the list since we checked the part from leaf dir to table's base dir + for (int i = kvs.size(); i > 0; i--) { + partSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]); + } + + return partSpec; + } + + /** + * Returns the default partition path of a table within a given database and partition key value + * pairs. It uses the database location and appends it the table name and the partition key,value + * pairs to create the Path for the partition directory + * + * @param db - parent database which is used to get the base location of the partition directory + * @param tableName - table name for the partitions + * @param pm - Partition key value pairs + * @return + * @throws MetaException + */ + public Path getDefaultPartitionPath(Database db, Table table, + Map<String, String> pm) throws MetaException { + return getPartitionPath(getDefaultTablePath(db, table), pm); + } + + /** + * Returns the path object for the given partition key-value pairs and the base location + * + * @param tblPath - the base location for the partitions. Typically the table location + * @param pm - Partition key value pairs + * @return + * @throws MetaException + */ + public Path getPartitionPath(Path tblPath, Map<String, String> pm) + throws MetaException { + return new Path(tblPath, makePartPath(pm)); + } + + /** + * Given a database, a table and the partition key value pairs this method returns the Path object + * corresponding to the partition key value pairs. It uses the table location if available else + * uses the database location for constructing the path corresponding to the partition key-value + * pairs + * + * @param db - Parent database of the given table + * @param table - Table for which the partition key-values are given + * @param vals - List of values for the partition keys + * @return Path corresponding to the partition key-value pairs + * @throws MetaException + */ + public Path getPartitionPath(Database db, Table table, List<String> vals) + throws MetaException { + List<FieldSchema> partKeys = table.getPartitionKeys(); + if (partKeys == null || (partKeys.size() != vals.size())) { + throw new MetaException("Invalid number of partition keys found for " + table.getTableName()); + } + Map<String, String> pm = new LinkedHashMap<>(vals.size()); + int i = 0; + for (FieldSchema key : partKeys) { + pm.put(key.getName(), vals.get(i)); + i++; + } + + if (table.getSd().getLocation() != null) { + return getPartitionPath(getDnsPath(new Path(table.getSd().getLocation())), pm); + } else { + return getDefaultPartitionPath(db, table, pm); + } + } + + public boolean isDir(Path f) throws MetaException { + FileSystem fs; + try { + fs = getFs(f); + FileStatus fstatus = fs.getFileStatus(f); + if (!fstatus.isDir()) { + return false; + } + } catch (FileNotFoundException e) { + return false; + } catch (IOException e) { + MetaStoreUtils.logAndThrowMetaException(e); + } + return true; + } + + public static String makePartName(List<FieldSchema> partCols, + List<String> vals) throws MetaException { + return makePartName(partCols, vals, null); + } + + /** + * @param desc + * @return array of FileStatus objects corresponding to the files + * making up the passed storage description + */ + public List<FileStatus> getFileStatusesForSD(StorageDescriptor desc) + throws MetaException { + return getFileStatusesForLocation(desc.getLocation()); + } + + /** + * @param location + * @return array of FileStatus objects corresponding to the files + * making up the passed storage description + */ + public List<FileStatus> getFileStatusesForLocation(String location) + throws MetaException { + try { + Path path = new Path(location); + FileSystem fileSys = path.getFileSystem(conf); + return FileUtils.getFileStatusRecurse(path, -1, fileSys); + } catch (IOException ioe) { + MetaStoreUtils.logAndThrowMetaException(ioe); + } + return null; + } + + /** + * @param db database + * @param table table + * @return array of FileStatus objects corresponding to the files making up the passed + * unpartitioned table + */ + public List<FileStatus> getFileStatusesForUnpartitionedTable(Database db, Table table) + throws MetaException { + Path tablePath = getDnsPath(new Path(table.getSd().getLocation())); + try { + FileSystem fileSys = tablePath.getFileSystem(conf); + return FileUtils.getFileStatusRecurse(tablePath, -1, fileSys); + } catch (IOException ioe) { + MetaStoreUtils.logAndThrowMetaException(ioe); + } + return null; + } + + /** + * Makes a valid partition name. + * @param partCols The partition columns + * @param vals The partition values + * @param defaultStr + * The default name given to a partition value if the respective value is empty or null. + * @return An escaped, valid partition name. + * @throws MetaException + */ + public static String makePartName(List<FieldSchema> partCols, + List<String> vals, String defaultStr) throws MetaException { + if ((partCols.size() != vals.size()) || (partCols.size() == 0)) { + String errorStr = "Invalid partition key & values; keys ["; + for (FieldSchema fs : partCols) { + errorStr += (fs.getName() + ", "); + } + errorStr += "], values ["; + for (String val : vals) { + errorStr += (val + ", "); + } + throw new MetaException(errorStr + "]"); + } + List<String> colNames = new ArrayList<>(); + for (FieldSchema col: partCols) { + colNames.add(col.getName()); + } + return FileUtils.makePartName(colNames, vals, defaultStr); + } + + public static List<String> getPartValuesFromPartName(String partName) + throws MetaException { + LinkedHashMap<String, String> partSpec = Warehouse.makeSpecFromName(partName); + List<String> values = new ArrayList<>(); + values.addAll(partSpec.values()); + return values; + } + + public static Map<String, String> makeSpecFromValues(List<FieldSchema> partCols, + List<String> values) { + Map<String, String> spec = new LinkedHashMap<>(); + for (int i = 0; i < values.size(); i++) { + spec.put(partCols.get(i).getName(), values.get(i)); + } + return spec; + } +}
