http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java deleted file mode 100644 index 851538a..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; -import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MetricsRegistry; -import org.apache.hadoop.metrics2.lib.MutableGaugeInt; -import org.apache.hadoop.metrics2.source.JvmMetrics; - -/** - * This class is for maintaining the various Router activity statistics - * and publishing them through the metrics interfaces. - */ -@Metrics(name="RouterActivity", about="Router metrics", context="dfs") -public class RouterMetrics { - - private final MetricsRegistry registry = new MetricsRegistry("router"); - - @Metric("Duration in SafeMode at startup in msec") - private MutableGaugeInt safeModeTime; - - private JvmMetrics jvmMetrics = null; - - RouterMetrics( - String processName, String sessionId, final JvmMetrics jvmMetrics) { - this.jvmMetrics = jvmMetrics; - registry.tag(ProcessName, processName).tag(SessionId, sessionId); - } - - public static RouterMetrics create(Configuration conf) { - String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY); - String processName = "Router"; - MetricsSystem ms = DefaultMetricsSystem.instance(); - JvmMetrics jm = JvmMetrics.create(processName, sessionId, ms); - - return ms.register(new RouterMetrics(processName, sessionId, jm)); - } - - public JvmMetrics getJvmMetrics() { - return jvmMetrics; - } - - public void shutdown() { - DefaultMetricsSystem.shutdown(); - } - - public void setSafeModeTime(long elapsed) { - safeModeTime.set((int) elapsed); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java deleted file mode 100644 index f4debce..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics; -import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; -import org.apache.hadoop.metrics2.source.JvmMetrics; -import org.apache.hadoop.service.AbstractService; - -/** - * Service to manage the metrics of the Router. - */ -public class RouterMetricsService extends AbstractService { - - /** Router for this metrics. */ - private final Router router; - - /** Router metrics. */ - private RouterMetrics routerMetrics; - /** Federation metrics. */ - private FederationMetrics federationMetrics; - /** Namenode mock metrics. */ - private NamenodeBeanMetrics nnMetrics; - - - public RouterMetricsService(final Router router) { - super(RouterMetricsService.class.getName()); - this.router = router; - } - - @Override - protected void serviceInit(Configuration configuration) throws Exception { - this.routerMetrics = RouterMetrics.create(configuration); - } - - @Override - protected void serviceStart() throws Exception { - // Wrapper for all the FSNamesystem JMX interfaces - this.nnMetrics = new NamenodeBeanMetrics(this.router); - - // Federation MBean JMX interface - this.federationMetrics = new FederationMetrics(this.router); - } - - @Override - protected void serviceStop() throws Exception { - // Remove JMX interfaces - if (this.federationMetrics != null) { - this.federationMetrics.close(); - } - - // Remove Namenode JMX interfaces - if (this.nnMetrics != null) { - this.nnMetrics.close(); - } - - // Shutdown metrics - if (this.routerMetrics != null) { - this.routerMetrics.shutdown(); - } - } - - /** - * Get the metrics system for the Router. - * - * @return Router metrics. - */ - public RouterMetrics getRouterMetrics() { - return this.routerMetrics; - } - - /** - * Get the federation metrics. - * - * @return Federation metrics. - */ - public FederationMetrics getFederationMetrics() { - return this.federationMetrics; - } - - /** - * Get the JVM metrics for the Router. - * - * @return JVM metrics. - */ - public JvmMetrics getJvmMetrics() { - if (this.routerMetrics == null) { - return null; - } - return this.routerMetrics.getJvmMetrics(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java deleted file mode 100644 index 9d81dce..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; -import org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; - -/** - * Class that helps in checking permissions in Router-based federation. - */ -public class RouterPermissionChecker extends FSPermissionChecker { - static final Log LOG = LogFactory.getLog(RouterPermissionChecker.class); - - /** Mount table default permission. */ - public static final short MOUNT_TABLE_PERMISSION_DEFAULT = 00755; - - public RouterPermissionChecker(String routerOwner, String supergroup, - UserGroupInformation callerUgi) { - super(routerOwner, supergroup, callerUgi, null); - } - - /** - * Whether a mount table entry can be accessed by the current context. - * - * @param mountTable - * MountTable being accessed - * @param access - * type of action being performed on the cache pool - * @throws AccessControlException - * if mount table cannot be accessed - */ - public void checkPermission(MountTable mountTable, FsAction access) - throws AccessControlException { - if (isSuperUser()) { - return; - } - - FsPermission mode = mountTable.getMode(); - if (getUser().equals(mountTable.getOwnerName()) - && mode.getUserAction().implies(access)) { - return; - } - - if (isMemberOfGroup(mountTable.getGroupName()) - && mode.getGroupAction().implies(access)) { - return; - } - - if (!getUser().equals(mountTable.getOwnerName()) - && !isMemberOfGroup(mountTable.getGroupName()) - && mode.getOtherAction().implies(access)) { - return; - } - - throw new AccessControlException( - "Permission denied while accessing mount table " - + mountTable.getSourcePath() - + ": user " + getUser() + " does not have " + access.toString() - + " permissions."); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java deleted file mode 100644 index 0df34fc..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.isParentEntry; - -import java.util.HashSet; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; - -/** - * Router quota manager in Router. The manager maintains - * {@link RouterQuotaUsage} cache of mount tables and do management - * for the quota caches. - */ -public class RouterQuotaManager { - /** Quota usage <MountTable Path, Aggregated QuotaUsage> cache. */ - private TreeMap<String, RouterQuotaUsage> cache; - - /** Lock to access the quota cache. */ - private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private final Lock readLock = readWriteLock.readLock(); - private final Lock writeLock = readWriteLock.writeLock(); - - public RouterQuotaManager() { - this.cache = new TreeMap<>(); - } - - /** - * Get all the mount quota paths. - */ - public Set<String> getAll() { - readLock.lock(); - try { - return this.cache.keySet(); - } finally { - readLock.unlock(); - } - } - - /** - * Get the nearest ancestor's quota usage, and meanwhile its quota was set. - * @param path The path being written. - * @return RouterQuotaUsage Quota usage. - */ - public RouterQuotaUsage getQuotaUsage(String path) { - readLock.lock(); - try { - RouterQuotaUsage quotaUsage = this.cache.get(path); - if (quotaUsage != null && isQuotaSet(quotaUsage)) { - return quotaUsage; - } - - // If not found, look for its parent path usage value. - int pos = path.lastIndexOf(Path.SEPARATOR); - if (pos != -1) { - String parentPath = path.substring(0, pos); - return getQuotaUsage(parentPath); - } - } finally { - readLock.unlock(); - } - - return null; - } - - /** - * Get children paths (can including itself) under specified federation path. - * @param parentPath - * @return Set<String> Children path set. - */ - public Set<String> getPaths(String parentPath) { - readLock.lock(); - try { - String from = parentPath; - String to = parentPath + Character.MAX_VALUE; - SortedMap<String, RouterQuotaUsage> subMap = this.cache.subMap(from, to); - - Set<String> validPaths = new HashSet<>(); - if (subMap != null) { - for (String path : subMap.keySet()) { - if (isParentEntry(path, parentPath)) { - validPaths.add(path); - } - } - } - return validPaths; - } finally { - readLock.unlock(); - } - } - - /** - * Put new entity into cache. - * @param path Mount table path. - * @param quotaUsage Corresponding cache value. - */ - public void put(String path, RouterQuotaUsage quotaUsage) { - writeLock.lock(); - try { - this.cache.put(path, quotaUsage); - } finally { - writeLock.unlock(); - } - } - - /** - * Remove the entity from cache. - * @param path Mount table path. - */ - public void remove(String path) { - writeLock.lock(); - try { - this.cache.remove(path); - } finally { - writeLock.unlock(); - } - } - - /** - * Clean up the cache. - */ - public void clear() { - writeLock.lock(); - try { - this.cache.clear(); - } finally { - writeLock.unlock(); - } - } - - /** - * Check if the quota was set. - * @param quota RouterQuotaUsage set in mount table. - */ - public boolean isQuotaSet(RouterQuotaUsage quota) { - if (quota != null) { - long nsQuota = quota.getQuota(); - long ssQuota = quota.getSpaceQuota(); - - // once nsQuota or ssQuota was set, this mount table is quota set - if (nsQuota != HdfsConstants.QUOTA_DONT_SET - || ssQuota != HdfsConstants.QUOTA_DONT_SET) { - return true; - } - } - - return false; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java deleted file mode 100644 index 80abc11..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java +++ /dev/null @@ -1,228 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.QuotaUsage; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; -import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Service to periodically update the {@link RouterQuotaUsage} - * cached information in the {@link Router} and update corresponding - * mount table in State Store. - */ -public class RouterQuotaUpdateService extends PeriodicService { - private static final Logger LOG = - LoggerFactory.getLogger(RouterQuotaUpdateService.class); - - private MountTableStore mountTableStore; - private RouterRpcServer rpcServer; - /** Router using this Service. */ - private final Router router; - /** Router Quota manager. */ - private RouterQuotaManager quotaManager; - - public RouterQuotaUpdateService(final Router router) throws IOException { - super(RouterQuotaUpdateService.class.getName()); - this.router = router; - this.rpcServer = router.getRpcServer(); - this.quotaManager = router.getQuotaManager(); - - if (this.quotaManager == null) { - throw new IOException("Router quota manager is not initialized."); - } - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - this.setIntervalMs(conf.getTimeDuration( - DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL, - DFSConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS)); - - super.serviceInit(conf); - } - - @Override - protected void periodicInvoke() { - LOG.debug("Start to update quota cache."); - try { - List<MountTable> updateMountTables = new LinkedList<>(); - List<MountTable> mountTables = getQuotaSetMountTables(); - for (MountTable entry : mountTables) { - String src = entry.getSourcePath(); - RouterQuotaUsage oldQuota = entry.getQuota(); - long nsQuota = oldQuota.getQuota(); - long ssQuota = oldQuota.getSpaceQuota(); - // Call RouterRpcServer#getQuotaUsage for getting current quota usage. - QuotaUsage currentQuotaUsage = this.rpcServer.getQuotaModule() - .getQuotaUsage(src); - // If quota is not set in some subclusters under federation path, - // set quota for this path. - if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_DONT_SET) { - this.rpcServer.setQuota(src, nsQuota, ssQuota, null); - } - - RouterQuotaUsage newQuota = generateNewQuota(oldQuota, - currentQuotaUsage); - this.quotaManager.put(src, newQuota); - entry.setQuota(newQuota); - - // only update mount tables which quota was changed - if (!oldQuota.equals(newQuota)) { - updateMountTables.add(entry); - - LOG.debug( - "Update quota usage entity of path: {}, nsCount: {}," - + " nsQuota: {}, ssCount: {}, ssQuota: {}.", - src, newQuota.getFileAndDirectoryCount(), - newQuota.getQuota(), newQuota.getSpaceConsumed(), - newQuota.getSpaceQuota()); - } - } - - updateMountTableEntries(updateMountTables); - } catch (IOException e) { - LOG.error("Quota cache updated error.", e); - } - } - - /** - * Get mount table store management interface. - * @return MountTableStore instance. - * @throws IOException - */ - private MountTableStore getMountTableStore() throws IOException { - if (this.mountTableStore == null) { - this.mountTableStore = router.getStateStore().getRegisteredRecordStore( - MountTableStore.class); - if (this.mountTableStore == null) { - throw new IOException("Mount table state store is not available."); - } - } - return this.mountTableStore; - } - - /** - * Get all the existing mount tables. - * @return List of mount tables. - * @throws IOException - */ - private List<MountTable> getMountTableEntries() throws IOException { - // scan mount tables from root path - GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest - .newInstance("/"); - GetMountTableEntriesResponse getResponse = getMountTableStore() - .getMountTableEntries(getRequest); - return getResponse.getEntries(); - } - - /** - * Get mount tables which quota was set. - * During this time, the quota usage cache will also be updated by - * quota manager: - * 1. Stale paths (entries) will be removed. - * 2. Existing entries will be override and updated. - * @return List of mount tables which quota was set. - * @throws IOException - */ - private List<MountTable> getQuotaSetMountTables() throws IOException { - List<MountTable> mountTables = getMountTableEntries(); - Set<String> stalePaths = new HashSet<>(); - for (String path : this.quotaManager.getAll()) { - stalePaths.add(path); - } - - List<MountTable> neededMountTables = new LinkedList<>(); - for (MountTable entry : mountTables) { - // select mount tables which is quota set - if (isQuotaSet(entry)) { - neededMountTables.add(entry); - } - - // update mount table entries info in quota cache - String src = entry.getSourcePath(); - this.quotaManager.put(src, entry.getQuota()); - stalePaths.remove(src); - } - - // remove stale paths that currently cached - for (String stalePath : stalePaths) { - this.quotaManager.remove(stalePath); - } - - return neededMountTables; - } - - /** - * Check if the quota was set in given MountTable. - * @param mountTable Mount table entry. - */ - private boolean isQuotaSet(MountTable mountTable) { - if (mountTable != null) { - return this.quotaManager.isQuotaSet(mountTable.getQuota()); - } - return false; - } - - /** - * Generate a new quota based on old quota and current quota usage value. - * @param oldQuota Old quota stored in State Store. - * @param currentQuotaUsage Current quota usage value queried from - * subcluster. - * @return A new RouterQuotaUsage. - */ - private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota, - QuotaUsage currentQuotaUsage) { - RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder() - .fileAndDirectoryCount(currentQuotaUsage.getFileAndDirectoryCount()) - .quota(oldQuota.getQuota()) - .spaceConsumed(currentQuotaUsage.getSpaceConsumed()) - .spaceQuota(oldQuota.getSpaceQuota()).build(); - return newQuota; - } - - /** - * Write out updated mount table entries into State Store. - * @param updateMountTables Mount tables to be updated. - * @throws IOException - */ - private void updateMountTableEntries(List<MountTable> updateMountTables) - throws IOException { - for (MountTable entry : updateMountTables) { - UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest - .newInstance(entry); - getMountTableStore().updateMountTableEntry(updateRequest); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java deleted file mode 100644 index eedd80f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.hadoop.fs.QuotaUsage; -import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; -import org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature; -import org.apache.hadoop.hdfs.server.namenode.Quota; -import org.apache.hadoop.util.StringUtils; - -/** - * The subclass of {@link QuotaUsage} used in Router-based federation. - */ -public final class RouterQuotaUsage extends QuotaUsage { - - /** Default quota usage count. */ - public static final long QUOTA_USAGE_COUNT_DEFAULT = 0; - - private RouterQuotaUsage(Builder builder) { - super(builder); - } - - /** Build the instance based on the builder. */ - public static class Builder extends QuotaUsage.Builder { - - public RouterQuotaUsage build() { - return new RouterQuotaUsage(this); - } - - @Override - public Builder fileAndDirectoryCount(long count) { - super.fileAndDirectoryCount(count); - return this; - } - - @Override - public Builder quota(long quota) { - super.quota(quota); - return this; - } - - @Override - public Builder spaceConsumed(long spaceConsumed) { - super.spaceConsumed(spaceConsumed); - return this; - } - - @Override - public Builder spaceQuota(long spaceQuota) { - super.spaceQuota(spaceQuota); - return this; - } - } - - /** - * Verify if namespace quota is violated once quota is set. Relevant - * method {@link DirectoryWithQuotaFeature#verifyNamespaceQuota}. - * @throws NSQuotaExceededException - */ - public void verifyNamespaceQuota() throws NSQuotaExceededException { - if (Quota.isViolated(getQuota(), getFileAndDirectoryCount())) { - throw new NSQuotaExceededException(getQuota(), - getFileAndDirectoryCount()); - } - } - - /** - * Verify if storage space quota is violated once quota is set. Relevant - * method {@link DirectoryWithQuotaFeature#verifyStoragespaceQuota}. - * @throws DSQuotaExceededException - */ - public void verifyStoragespaceQuota() throws DSQuotaExceededException { - if (Quota.isViolated(getSpaceQuota(), getSpaceConsumed())) { - throw new DSQuotaExceededException(getSpaceQuota(), getSpaceConsumed()); - } - } - - @Override - public String toString() { - String nsQuota = String.valueOf(getQuota()); - String nsCount = String.valueOf(getFileAndDirectoryCount()); - if (getQuota() == HdfsConstants.QUOTA_DONT_SET) { - nsQuota = "-"; - nsCount = "-"; - } - - String ssQuota = StringUtils.byteDesc(getSpaceQuota()); - String ssCount = StringUtils.byteDesc(getSpaceConsumed()); - if (getSpaceQuota() == HdfsConstants.QUOTA_DONT_SET) { - ssQuota = "-"; - ssCount = "-"; - } - - StringBuilder str = new StringBuilder(); - str.append("[NsQuota: ").append(nsQuota).append("/") - .append(nsCount); - str.append(", SsQuota: ").append(ssQuota) - .append("/").append(ssCount) - .append("]"); - return str.toString(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java deleted file mode 100644 index d3b7947..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ /dev/null @@ -1,1022 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.federation.router; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; -import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; -import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.ipc.StandbyException; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * A client proxy for Router -> NN communication using the NN ClientProtocol. - * <p> - * Provides routers to invoke remote ClientProtocol methods and handle - * retries/failover. - * <ul> - * <li>invokeSingle Make a single request to a single namespace - * <li>invokeSequential Make a sequential series of requests to multiple - * ordered namespaces until a condition is met. - * <li>invokeConcurrent Make concurrent requests to multiple namespaces and - * return all of the results. - * </ul> - * Also maintains a cached pool of connections to NNs. Connections are managed - * by the ConnectionManager and are unique to each user + NN. The size of the - * connection pool can be configured. Larger pools allow for more simultaneous - * requests to a single NN from a single user. - */ -public class RouterRpcClient { - - private static final Logger LOG = - LoggerFactory.getLogger(RouterRpcClient.class); - - - /** Router identifier. */ - private final String routerId; - - /** Interface to identify the active NN for a nameservice or blockpool ID. */ - private final ActiveNamenodeResolver namenodeResolver; - - /** Connection pool to the Namenodes per user for performance. */ - private final ConnectionManager connectionManager; - /** Service to run asynchronous calls. */ - private final ExecutorService executorService; - /** Retry policy for router -> NN communication. */ - private final RetryPolicy retryPolicy; - /** Optional perf monitor. */ - private final RouterRpcMonitor rpcMonitor; - - /** Pattern to parse a stack trace line. */ - private static final Pattern STACK_TRACE_PATTERN = - Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)"); - - - /** - * Create a router RPC client to manage remote procedure calls to NNs. - * - * @param conf Hdfs Configuation. - * @param resolver A NN resolver to determine the currently active NN in HA. - * @param monitor Optional performance monitor. - */ - public RouterRpcClient(Configuration conf, String identifier, - ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { - this.routerId = identifier; - - this.namenodeResolver = resolver; - - this.connectionManager = new ConnectionManager(conf); - this.connectionManager.start(); - - int numThreads = conf.getInt( - DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, - DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat("RPC Router Client-%d") - .build(); - this.executorService = Executors.newFixedThreadPool( - numThreads, threadFactory); - - this.rpcMonitor = monitor; - - int maxFailoverAttempts = conf.getInt( - HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, - HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); - int maxRetryAttempts = conf.getInt( - DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS, - DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT); - int failoverSleepBaseMillis = conf.getInt( - HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, - HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); - int failoverSleepMaxMillis = conf.getInt( - HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, - HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT); - this.retryPolicy = RetryPolicies.failoverOnNetworkException( - RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts, - failoverSleepBaseMillis, failoverSleepMaxMillis); - } - - /** - * Get the active namenode resolver used by this client. - * @return Active namenode resolver. - */ - public ActiveNamenodeResolver getNamenodeResolver() { - return this.namenodeResolver; - } - - /** - * Shutdown the client. - */ - public void shutdown() { - if (this.connectionManager != null) { - this.connectionManager.close(); - } - if (this.executorService != null) { - this.executorService.shutdownNow(); - } - } - - /** - * Total number of available sockets between the router and NNs. - * - * @return Number of namenode clients. - */ - public int getNumConnections() { - return this.connectionManager.getNumConnections(); - } - - /** - * Total number of available sockets between the router and NNs. - * - * @return Number of namenode clients. - */ - public int getNumActiveConnections() { - return this.connectionManager.getNumActiveConnections(); - } - - /** - * Total number of open connection pools to a NN. Each connection pool. - * represents one user + one NN. - * - * @return Number of connection pools. - */ - public int getNumConnectionPools() { - return this.connectionManager.getNumConnectionPools(); - } - - /** - * Number of connections between the router and NNs being created sockets. - * - * @return Number of connections waiting to be created. - */ - public int getNumCreatingConnections() { - return this.connectionManager.getNumCreatingConnections(); - } - - /** - * JSON representation of the connection pool. - * - * @return String representation of the JSON. - */ - public String getJSON() { - return this.connectionManager.getJSON(); - } - - /** - * Get ClientProtocol proxy client for a NameNode. Each combination of user + - * NN must use a unique proxy client. Previously created clients are cached - * and stored in a connection pool by the ConnectionManager. - * - * @param ugi User group information. - * @param nsId Nameservice identifier. - * @param rpcAddress ClientProtocol RPC server address of the NN. - * @return ConnectionContext containing a ClientProtocol proxy client for the - * NN + current user. - * @throws IOException If we cannot get a connection to the NameNode. - */ - private ConnectionContext getConnection( - UserGroupInformation ugi, String nsId, String rpcAddress) - throws IOException { - ConnectionContext connection = null; - try { - // Each proxy holds the UGI info for the current user when it is created. - // This cache does not scale very well, one entry per user per namenode, - // and may need to be adjusted and/or selectively pruned. The cache is - // important due to the excessive overhead of creating a new proxy wrapper - // for each individual request. - - // TODO Add tokens from the federated UGI - connection = this.connectionManager.getConnection(ugi, rpcAddress); - LOG.debug("User {} NN {} is using connection {}", - ugi.getUserName(), rpcAddress, connection); - } catch (Exception ex) { - LOG.error("Cannot open NN client to address: {}", rpcAddress, ex); - } - - if (connection == null) { - throw new IOException("Cannot get a connection to " + rpcAddress); - } - return connection; - } - - /** - * Convert an exception to an IOException. - * - * For a non-IOException, wrap it with IOException. For a RemoteException, - * unwrap it. For an IOException which is not a RemoteException, return it. - * - * @param e Exception to convert into an exception. - * @return Created IO exception. - */ - private static IOException toIOException(Exception e) { - if (e instanceof RemoteException) { - return ((RemoteException) e).unwrapRemoteException(); - } - if (e instanceof IOException) { - return (IOException)e; - } - return new IOException(e); - } - - /** - * If we should retry the RPC call. - * - * @param ioe IOException reported. - * @param retryCount Number of retries. - * @param nsId Nameservice ID. - * @return Retry decision. - * @throws IOException Original exception if the retry policy generates one - * or IOException for no available namenodes. - */ - private RetryDecision shouldRetry(final IOException ioe, final int retryCount, - final String nsId) throws IOException { - // check for the case of cluster unavailable state - if (isClusterUnAvailable(nsId)) { - // we allow to retry once if cluster is unavailable - if (retryCount == 0) { - return RetryDecision.RETRY; - } else { - throw new IOException("No namenode available under nameservice " + nsId, - ioe); - } - } - - try { - final RetryPolicy.RetryAction a = - this.retryPolicy.shouldRetry(ioe, retryCount, 0, true); - return a.action; - } catch (Exception ex) { - LOG.error("Re-throwing API exception, no more retries", ex); - throw toIOException(ex); - } - } - - /** - * Invokes a method against the ClientProtocol proxy server. If a standby - * exception is generated by the call to the client, retries using the - * alternate server. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param ugi User group information. - * @param namenodes A prioritized list of namenodes within the same - * nameservice. - * @param method Remote ClientProtcol method to invoke. - * @param params Variable list of parameters matching the method. - * @return The result of invoking the method. - * @throws IOException - */ - private Object invokeMethod( - final UserGroupInformation ugi, - final List<? extends FederationNamenodeContext> namenodes, - final Method method, final Object... params) throws IOException { - - if (namenodes == null || namenodes.isEmpty()) { - throw new IOException("No namenodes to invoke " + method.getName() + - " with params " + Arrays.toString(params) + " from " + this.routerId); - } - - Object ret = null; - if (rpcMonitor != null) { - rpcMonitor.proxyOp(); - } - boolean failover = false; - Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>(); - for (FederationNamenodeContext namenode : namenodes) { - ConnectionContext connection = null; - try { - String nsId = namenode.getNameserviceId(); - String rpcAddress = namenode.getRpcAddress(); - connection = this.getConnection(ugi, nsId, rpcAddress); - ProxyAndInfo<ClientProtocol> client = connection.getClient(); - ClientProtocol proxy = client.getProxy(); - ret = invoke(nsId, 0, method, proxy, params); - if (failover) { - // Success on alternate server, update - InetSocketAddress address = client.getAddress(); - namenodeResolver.updateActiveNamenode(nsId, address); - } - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true); - } - return ret; - } catch (IOException ioe) { - ioes.put(namenode, ioe); - if (ioe instanceof StandbyException) { - // Fail over indicated by retry policy and/or NN - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureStandby(); - } - failover = true; - } else if (ioe instanceof RemoteException) { - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true); - } - // RemoteException returned by NN - throw (RemoteException) ioe; - } else { - // Other communication error, this is a failure - // Communication retries are handled by the retry policy - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpFailureCommunicate(); - this.rpcMonitor.proxyOpComplete(false); - } - throw ioe; - } - } finally { - if (connection != null) { - connection.release(); - } - } - } - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(false); - } - - // All namenodes were unavailable or in standby - String msg = "No namenode available to invoke " + method.getName() + " " + - Arrays.toString(params); - LOG.error(msg); - for (Entry<FederationNamenodeContext, IOException> entry : - ioes.entrySet()) { - FederationNamenodeContext namenode = entry.getKey(); - String nsId = namenode.getNameserviceId(); - String nnId = namenode.getNamenodeId(); - String addr = namenode.getRpcAddress(); - IOException ioe = entry.getValue(); - if (ioe instanceof StandbyException) { - LOG.error("{} {} at {} is in Standby", nsId, nnId, addr); - } else { - LOG.error("{} {} at {} error: \"{}\"", - nsId, nnId, addr, ioe.getMessage()); - } - } - throw new StandbyException(msg); - } - - /** - * Invokes a method on the designated object. Catches exceptions specific to - * the invocation. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param nsId Identifier for the namespace - * @param retryCount Current retry times - * @param method Method to invoke - * @param obj Target object for the method - * @param params Variable parameters - * @return Response from the remote server - * @throws IOException - * @throws InterruptedException - */ - private Object invoke(String nsId, int retryCount, final Method method, - final Object obj, final Object... params) throws IOException { - try { - return method.invoke(obj, params); - } catch (IllegalAccessException e) { - LOG.error("Unexpected exception while proxying API", e); - return null; - } catch (IllegalArgumentException e) { - LOG.error("Unexpected exception while proxying API", e); - return null; - } catch (InvocationTargetException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException) { - IOException ioe = (IOException) cause; - - // Check if we should retry. - RetryDecision decision = shouldRetry(ioe, retryCount, nsId); - if (decision == RetryDecision.RETRY) { - if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpRetries(); - } - - // retry - return invoke(nsId, ++retryCount, method, obj, params); - } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { - // failover, invoker looks for standby exceptions for failover. - if (ioe instanceof StandbyException) { - throw ioe; - } else { - throw new StandbyException(ioe.getMessage()); - } - } else { - if (ioe instanceof RemoteException) { - RemoteException re = (RemoteException) ioe; - ioe = re.unwrapRemoteException(); - ioe = getCleanException(ioe); - } - throw ioe; - } - } else { - throw new IOException(e); - } - } - } - - /** - * Check if the cluster of given nameservice id is available. - * @param nsId nameservice ID. - * @return - * @throws IOException - */ - private boolean isClusterUnAvailable(String nsId) throws IOException { - List<? extends FederationNamenodeContext> nnState = this.namenodeResolver - .getNamenodesForNameserviceId(nsId); - - if (nnState != null) { - for (FederationNamenodeContext nnContext : nnState) { - // Once we find one NN is in active state, we assume this - // cluster is available. - if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) { - return false; - } - } - } - - return true; - } - - /** - * Get a clean copy of the exception. Sometimes the exceptions returned by the - * server contain the full stack trace in the message. - * - * @param ioe Exception to clean up. - * @return Copy of the original exception with a clean message. - */ - private static IOException getCleanException(IOException ioe) { - IOException ret = null; - - String msg = ioe.getMessage(); - Throwable cause = ioe.getCause(); - StackTraceElement[] stackTrace = ioe.getStackTrace(); - - // Clean the message by removing the stack trace - int index = msg.indexOf("\n"); - if (index > 0) { - String[] msgSplit = msg.split("\n"); - msg = msgSplit[0]; - - // Parse stack trace from the message - List<StackTraceElement> elements = new LinkedList<>(); - for (int i=1; i<msgSplit.length; i++) { - String line = msgSplit[i]; - Matcher matcher = STACK_TRACE_PATTERN.matcher(line); - if (matcher.find()) { - String declaringClass = matcher.group(1); - String methodName = matcher.group(2); - String fileName = matcher.group(3); - int lineNumber = Integer.parseInt(matcher.group(4)); - StackTraceElement element = new StackTraceElement( - declaringClass, methodName, fileName, lineNumber); - elements.add(element); - } - } - stackTrace = elements.toArray(new StackTraceElement[elements.size()]); - } - - // Create the new output exception - if (ioe instanceof RemoteException) { - RemoteException re = (RemoteException)ioe; - ret = new RemoteException(re.getClassName(), msg); - } else { - // Try the simple constructor and initialize the fields - Class<? extends IOException> ioeClass = ioe.getClass(); - try { - Constructor<? extends IOException> constructor = - ioeClass.getDeclaredConstructor(String.class); - ret = constructor.newInstance(msg); - } catch (ReflectiveOperationException e) { - // If there are errors, just use the input one - LOG.error("Could not create exception {}", ioeClass.getSimpleName(), e); - ret = ioe; - } - } - if (ret != null) { - ret.initCause(cause); - ret.setStackTrace(stackTrace); - } - - return ret; - } - - /** - * Invokes a ClientProtocol method. Determines the target nameservice via a - * provided block. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param block Block used to determine appropriate nameservice. - * @param method The remote method and parameters to invoke. - * @return The result of invoking the method. - * @throws IOException - */ - public Object invokeSingle(final ExtendedBlock block, RemoteMethod method) - throws IOException { - String bpId = block.getBlockPoolId(); - return invokeSingleBlockPool(bpId, method); - } - - /** - * Invokes a ClientProtocol method. Determines the target nameservice using - * the block pool id. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param bpId Block pool identifier. - * @param method The remote method and parameters to invoke. - * @return The result of invoking the method. - * @throws IOException - */ - public Object invokeSingleBlockPool(final String bpId, RemoteMethod method) - throws IOException { - String nsId = getNameserviceForBlockPoolId(bpId); - return invokeSingle(nsId, method); - } - - /** - * Invokes a ClientProtocol method against the specified namespace. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param nsId Target namespace for the method. - * @param method The remote method and parameters to invoke. - * @return The result of invoking the method. - * @throws IOException - */ - public Object invokeSingle(final String nsId, RemoteMethod method) - throws IOException { - UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - List<? extends FederationNamenodeContext> nns = - getNamenodesForNameservice(nsId); - RemoteLocationContext loc = new RemoteLocation(nsId, "/"); - return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc)); - } - - /** - * Invokes a single proxy call for a single location. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param location RemoteLocation to invoke. - * @param remoteMethod The remote method and parameters to invoke. - * @return The result of invoking the method if successful. - * @throws IOException - */ - public Object invokeSingle(final RemoteLocationContext location, - RemoteMethod remoteMethod) throws IOException { - List<RemoteLocationContext> locations = Collections.singletonList(location); - return invokeSequential(locations, remoteMethod); - } - - /** - * Invokes sequential proxy calls to different locations. Continues to invoke - * calls until a call returns without throwing a remote exception. - * - * @param locations List of locations/nameservices to call concurrently. - * @param remoteMethod The remote method and parameters to invoke. - * @return The result of the first successful call, or if no calls are - * successful, the result of the last RPC call executed. - * @throws IOException if the success condition is not met and one of the RPC - * calls generated a remote exception. - */ - public Object invokeSequential( - final List<? extends RemoteLocationContext> locations, - final RemoteMethod remoteMethod) throws IOException { - return invokeSequential(locations, remoteMethod, null, null); - } - - /** - * Invokes sequential proxy calls to different locations. Continues to invoke - * calls until the success condition is met, or until all locations have been - * attempted. - * - * The success condition may be specified by: - * <ul> - * <li>An expected result class - * <li>An expected result value - * </ul> - * - * If no expected result class/values are specified, the success condition is - * a call that does not throw a remote exception. - * - * @param locations List of locations/nameservices to call concurrently. - * @param remoteMethod The remote method and parameters to invoke. - * @param expectedResultClass In order to be considered a positive result, the - * return type must be of this class. - * @param expectedResultValue In order to be considered a positive result, the - * return value must equal the value of this object. - * @return The result of the first successful call, or if no calls are - * successful, the result of the first RPC call executed. - * @throws IOException if the success condition is not met, return the first - * remote exception generated. - */ - public <T> T invokeSequential( - final List<? extends RemoteLocationContext> locations, - final RemoteMethod remoteMethod, Class<T> expectedResultClass, - Object expectedResultValue) throws IOException { - - final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - final Method m = remoteMethod.getMethod(); - IOException firstThrownException = null; - IOException lastThrownException = null; - Object firstResult = null; - // Invoke in priority order - for (final RemoteLocationContext loc : locations) { - String ns = loc.getNameserviceId(); - List<? extends FederationNamenodeContext> namenodes = - getNamenodesForNameservice(ns); - try { - Object[] params = remoteMethod.getParams(loc); - Object result = invokeMethod(ugi, namenodes, m, params); - // Check if the result is what we expected - if (isExpectedClass(expectedResultClass, result) && - isExpectedValue(expectedResultValue, result)) { - // Valid result, stop here - @SuppressWarnings("unchecked") - T ret = (T)result; - return ret; - } - if (firstResult == null) { - firstResult = result; - } - } catch (IOException ioe) { - // Record it and move on - lastThrownException = (IOException) ioe; - if (firstThrownException == null) { - firstThrownException = lastThrownException; - } - } catch (Exception e) { - // Unusual error, ClientProtocol calls always use IOException (or - // RemoteException). Re-wrap in IOException for compatibility with - // ClientProtcol. - LOG.error("Unexpected exception {} proxying {} to {}", - e.getClass(), m.getName(), ns, e); - lastThrownException = new IOException( - "Unexpected exception proxying API " + e.getMessage(), e); - if (firstThrownException == null) { - firstThrownException = lastThrownException; - } - } - } - - if (firstThrownException != null) { - // re-throw the last exception thrown for compatibility - throw firstThrownException; - } - // Return the last result, whether it is the value we are looking for or a - @SuppressWarnings("unchecked") - T ret = (T)firstResult; - return ret; - } - - /** - * Checks if a result matches the required result class. - * - * @param expectedClass Required result class, null to skip the check. - * @param clazz The result to check. - * @return True if the result is an instance of the required class or if the - * expected class is null. - */ - private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) { - if (expectedClass == null) { - return true; - } else if (clazz == null) { - return false; - } else { - return expectedClass.isInstance(clazz); - } - } - - /** - * Checks if a result matches the expected value. - * - * @param expectedValue The expected value, null to skip the check. - * @param value The result to check. - * @return True if the result is equals to the expected value or if the - * expected value is null. - */ - private static boolean isExpectedValue(Object expectedValue, Object value) { - if (expectedValue == null) { - return true; - } else if (value == null) { - return false; - } else { - return value.equals(expectedValue); - } - } - - /** - * Invoke multiple concurrent proxy calls to different clients. Returns an - * array of results. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param <T> The type of the remote location. - * @param locations List of remote locations to call concurrently. - * @param method The remote method and parameters to invoke. - * @param requireResponse If true an exception will be thrown if all calls do - * not complete. If false exceptions are ignored and all data results - * successfully received are returned. - * @param standby If the requests should go to the standby namenodes too. - * @throws IOException If all the calls throw an exception. - */ - public <T extends RemoteLocationContext, R> void invokeConcurrent( - final Collection<T> locations, final RemoteMethod method, - boolean requireResponse, boolean standby) throws IOException { - invokeConcurrent(locations, method, requireResponse, standby, void.class); - } - - /** - * Invokes multiple concurrent proxy calls to different clients. Returns an - * array of results. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param <T> The type of the remote location. - * @param <R> The type of the remote method return. - * @param locations List of remote locations to call concurrently. - * @param method The remote method and parameters to invoke. - * @param requireResponse If true an exception will be thrown if all calls do - * not complete. If false exceptions are ignored and all data results - * successfully received are returned. - * @param standby If the requests should go to the standby namenodes too. - * @param clazz Type of the remote return type. - * @return Result of invoking the method per subcluster: nsId -> result. - * @throws IOException If requiredResponse=true and any of the calls throw an - * exception. - */ - public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent( - final Collection<T> locations, final RemoteMethod method, - boolean requireResponse, boolean standby, Class<R> clazz) - throws IOException { - return invokeConcurrent( - locations, method, requireResponse, standby, -1, clazz); - } - - /** - * Invokes multiple concurrent proxy calls to different clients. Returns an - * array of results. - * - * Re-throws exceptions generated by the remote RPC call as either - * RemoteException or IOException. - * - * @param <T> The type of the remote location. - * @param <R> The type of the remote method return. - * @param locations List of remote locations to call concurrently. - * @param method The remote method and parameters to invoke. - * @param requireResponse If true an exception will be thrown if all calls do - * not complete. If false exceptions are ignored and all data results - * successfully received are returned. - * @param standby If the requests should go to the standby namenodes too. - * @param timeOutMs Timeout for each individual call. - * @param clazz Type of the remote return type. - * @return Result of invoking the method per subcluster: nsId -> result. - * @throws IOException If requiredResponse=true and any of the calls throw an - * exception. - */ - @SuppressWarnings("unchecked") - public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent( - final Collection<T> locations, final RemoteMethod method, - boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz) - throws IOException { - - final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); - final Method m = method.getMethod(); - - if (locations.size() == 1) { - // Shortcut, just one call - T location = locations.iterator().next(); - String ns = location.getNameserviceId(); - final List<? extends FederationNamenodeContext> namenodes = - getNamenodesForNameservice(ns); - Object[] paramList = method.getParams(location); - Object result = invokeMethod(ugi, namenodes, m, paramList); - return Collections.singletonMap(location, clazz.cast(result)); - } - - List<T> orderedLocations = new LinkedList<>(); - Set<Callable<Object>> callables = new HashSet<>(); - for (final T location : locations) { - String nsId = location.getNameserviceId(); - final List<? extends FederationNamenodeContext> namenodes = - getNamenodesForNameservice(nsId); - final Object[] paramList = method.getParams(location); - if (standby) { - // Call the objectGetter to all NNs (including standby) - for (final FederationNamenodeContext nn : namenodes) { - String nnId = nn.getNamenodeId(); - final List<FederationNamenodeContext> nnList = - Collections.singletonList(nn); - T nnLocation = location; - if (location instanceof RemoteLocation) { - nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); - } - orderedLocations.add(nnLocation); - callables.add(new Callable<Object>() { - public Object call() throws Exception { - return invokeMethod(ugi, nnList, m, paramList); - } - }); - } - } else { - // Call the objectGetter in order of nameservices in the NS list - orderedLocations.add(location); - callables.add(new Callable<Object>() { - public Object call() throws Exception { - return invokeMethod(ugi, namenodes, m, paramList); - } - }); - } - } - - if (rpcMonitor != null) { - rpcMonitor.proxyOp(); - } - - try { - List<Future<Object>> futures = null; - if (timeOutMs > 0) { - futures = executorService.invokeAll( - callables, timeOutMs, TimeUnit.MILLISECONDS); - } else { - futures = executorService.invokeAll(callables); - } - Map<T, R> results = new TreeMap<>(); - Map<T, IOException> exceptions = new TreeMap<>(); - for (int i=0; i<futures.size(); i++) { - T location = orderedLocations.get(i); - try { - Future<Object> future = futures.get(i); - Object result = future.get(); - results.put(location, clazz.cast(result)); - } catch (CancellationException ce) { - T loc = orderedLocations.get(i); - String msg = - "Invocation to \"" + loc + "\" for \"" + method + "\" timed out"; - LOG.error(msg); - IOException ioe = new IOException(msg); - exceptions.put(location, ioe); - } catch (ExecutionException ex) { - Throwable cause = ex.getCause(); - LOG.debug("Canot execute {} in {}: {}", - m.getName(), location, cause.getMessage()); - - // Convert into IOException if needed - IOException ioe = null; - if (cause instanceof IOException) { - ioe = (IOException) cause; - } else { - ioe = new IOException("Unhandled exception while proxying API " + - m.getName() + ": " + cause.getMessage(), cause); - } - - // Response from all servers required, use this error. - if (requireResponse) { - throw ioe; - } - - // Store the exceptions - exceptions.put(location, ioe); - } - } - - // Throw the exception for the first location if there are no results - if (results.isEmpty()) { - T location = orderedLocations.get(0); - IOException ioe = exceptions.get(location); - if (ioe != null) { - throw ioe; - } - } - - return results; - } catch (InterruptedException ex) { - LOG.error("Unexpected error while invoking API: {}", ex.getMessage()); - throw new IOException( - "Unexpected error while invoking API " + ex.getMessage(), ex); - } - } - - /** - * Get a prioritized list of NNs that share the same nameservice ID (in the - * same namespace). NNs that are reported as ACTIVE will be first in the list. - * - * @param nsId The nameservice ID for the namespace. - * @return A prioritized list of NNs to use for communication. - * @throws IOException If a NN cannot be located for the nameservice ID. - */ - private List<? extends FederationNamenodeContext> getNamenodesForNameservice( - final String nsId) throws IOException { - - final List<? extends FederationNamenodeContext> namenodes = - namenodeResolver.getNamenodesForNameserviceId(nsId); - - if (namenodes == null || namenodes.isEmpty()) { - throw new IOException("Cannot locate a registered namenode for " + nsId + - " from " + this.routerId); - } - return namenodes; - } - - /** - * Get a prioritized list of NNs that share the same block pool ID (in the - * same namespace). NNs that are reported as ACTIVE will be first in the list. - * - * @param bpId The blockpool ID for the namespace. - * @return A prioritized list of NNs to use for communication. - * @throws IOException If a NN cannot be located for the block pool ID. - */ - private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId( - final String bpId) throws IOException { - - List<? extends FederationNamenodeContext> namenodes = - namenodeResolver.getNamenodesForBlockPoolId(bpId); - - if (namenodes == null || namenodes.isEmpty()) { - throw new IOException("Cannot locate a registered namenode for " + bpId + - " from " + this.routerId); - } - return namenodes; - } - - /** - * Get the nameservice identifier for a block pool. - * - * @param bpId Identifier of the block pool. - * @return Nameservice identifier. - * @throws IOException If a NN cannot be located for the block pool ID. - */ - private String getNameserviceForBlockPoolId(final String bpId) - throws IOException { - List<? extends FederationNamenodeContext> namenodes = - getNamenodesForBlockPoolId(bpId); - FederationNamenodeContext namenode = namenodes.get(0); - return namenode.getNameserviceId(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java deleted file mode 100644 index df9aa11..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.federation.router; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; - -/** - * Metrics and monitoring interface for the router RPC server. Allows pluggable - * diagnostics and monitoring services to be attached. - */ -public interface RouterRpcMonitor { - - /** - * Initialize the monitor. - * @param conf Configuration for the monitor. - * @param server RPC server. - * @param store State Store. - */ - void init( - Configuration conf, RouterRpcServer server, StateStoreService store); - - /** - * Get Router RPC metrics info. - * @return The instance of FederationRPCMetrics. - */ - FederationRPCMetrics getRPCMetrics(); - - /** - * Close the monitor. - */ - void close(); - - /** - * Start processing an operation on the Router. - */ - void startOp(); - - /** - * Start proxying an operation to the Namenode. - * @return Id of the thread doing the proxying. - */ - long proxyOp(); - - /** - * Mark a proxy operation as completed. - * @param success If the operation was successful. - */ - void proxyOpComplete(boolean success); - - /** - * Failed to proxy an operation to a Namenode because it was in standby. - */ - void proxyOpFailureStandby(); - - /** - * Failed to proxy an operation to a Namenode because of an unexpected - * exception. - */ - void proxyOpFailureCommunicate(); - - /** - * Failed to proxy an operation because it is not implemented. - */ - void proxyOpNotImplemented(); - - /** - * Retry to proxy an operation to a Namenode because of an unexpected - * exception. - */ - void proxyOpRetries(); - - /** - * If the Router cannot contact the State Store in an operation. - */ - void routerFailureStateStore(); - - /** - * If the Router is in safe mode. - */ - void routerFailureSafemode(); - - /** - * If a path is locked. - */ - void routerFailureLocked(); - - /** - * If a path is in a read only mount point. - */ - void routerFailureReadOnly(); -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org