http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java new file mode 100644 index 0000000..b36e459 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClient.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolTranslatorPB; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Client to connect to the {@link Router} via the admin protocol. + */ +@Private +public class RouterClient implements Closeable { + + private final RouterAdminProtocolTranslatorPB proxy; + private final UserGroupInformation ugi; + + private static RouterAdminProtocolTranslatorPB createRouterProxy( + InetSocketAddress address, Configuration conf, UserGroupInformation ugi) + throws IOException { + + RPC.setProtocolEngine( + conf, RouterAdminProtocolPB.class, ProtobufRpcEngine.class); + + AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false); + final long version = RPC.getProtocolVersion(RouterAdminProtocolPB.class); + RouterAdminProtocolPB proxy = RPC.getProtocolProxy( + RouterAdminProtocolPB.class, version, address, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), + RPC.getRpcTimeout(conf), null, + fallbackToSimpleAuth).getProxy(); + + return new RouterAdminProtocolTranslatorPB(proxy); + } + + public RouterClient(InetSocketAddress address, Configuration conf) + throws IOException { + this.ugi = UserGroupInformation.getCurrentUser(); + this.proxy = createRouterProxy(address, conf, ugi); + } + + public MountTableManager getMountTableManager() { + return proxy; + } + + public RouterStateManager getRouterStateManager() { + return proxy; + } + + @Override + public synchronized void close() throws IOException { + RPC.stopProxy(proxy); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java new file mode 100644 index 0000000..fe172c2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.RecordStore; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically update the Router current state in the State Store. + */ +public class RouterHeartbeatService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterHeartbeatService.class); + + /** Router we are hearbeating. */ + private final Router router; + + /** + * Create a new Router heartbeat service. + * + * @param router Router to heartbeat. + */ + public RouterHeartbeatService(Router router) { + super(RouterHeartbeatService.class.getSimpleName()); + this.router = router; + } + + /** + * Trigger the update of the Router state asynchronously. + */ + protected void updateStateAsync() { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + updateStateStore(); + } + }, "Router Heartbeat Async"); + thread.setDaemon(true); + thread.start(); + } + + /** + * Update the state of the Router in the State Store. + */ + @VisibleForTesting + synchronized void updateStateStore() { + String routerId = router.getRouterId(); + if (routerId == null) { + LOG.error("Cannot heartbeat for router: unknown router id"); + return; + } + if (isStoreAvailable()) { + RouterStore routerStore = router.getRouterStateManager(); + try { + RouterState record = RouterState.newInstance( + routerId, router.getStartTime(), router.getRouterState()); + StateStoreVersion stateStoreVersion = StateStoreVersion.newInstance( + getStateStoreVersion(MembershipStore.class), + getStateStoreVersion(MountTableStore.class)); + record.setStateStoreVersion(stateStoreVersion); + RouterHeartbeatRequest request = + RouterHeartbeatRequest.newInstance(record); + RouterHeartbeatResponse response = routerStore.routerHeartbeat(request); + if (!response.getStatus()) { + LOG.warn("Cannot heartbeat router {}", routerId); + } else { + LOG.debug("Router heartbeat for router {}", routerId); + } + } catch (IOException e) { + LOG.error("Cannot heartbeat router {}: {}", routerId, e.getMessage()); + } + } else { + LOG.warn("Cannot heartbeat router {}: State Store unavailable", routerId); + } + } + + /** + * Get the version of the data in the State Store. + * + * @param clazz Class in the State Store. + * @return Version of the data. + */ + private <R extends BaseRecord, S extends RecordStore<R>> + long getStateStoreVersion(final Class<S> clazz) { + long version = -1; + try { + StateStoreService stateStore = router.getStateStore(); + S recordStore = stateStore.getRegisteredRecordStore(clazz); + if (recordStore != null) { + if (recordStore instanceof CachedRecordStore) { + CachedRecordStore<R> cachedRecordStore = + (CachedRecordStore<R>) recordStore; + List<R> records = cachedRecordStore.getCachedRecords(); + for (BaseRecord record : records) { + if (record.getDateModified() > version) { + version = record.getDateModified(); + } + } + } + } + } catch (Exception e) { + LOG.error("Cannot get version for {}: {}", clazz, e.getMessage()); + } + return version; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + + long interval = conf.getTimeDuration( + RBFConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS, + RBFConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT, + TimeUnit.MILLISECONDS); + this.setIntervalMs(interval); + + super.serviceInit(conf); + } + + @Override + public void periodicInvoke() { + updateStateStore(); + } + + private boolean isStoreAvailable() { + if (router.getRouterStateManager() == null) { + return false; + } + if (router.getStateStore() == null) { + return false; + } + return router.getStateStore().isDriverReady(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java new file mode 100644 index 0000000..93a9cff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHttpServer.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.service.AbstractService; + +/** + * Web interface for the {@link Router}. It exposes the Web UI and the WebHDFS + * methods from {@link RouterWebHdfsMethods}. + */ +public class RouterHttpServer extends AbstractService { + + protected static final String NAMENODE_ATTRIBUTE_KEY = "name.node"; + + + /** Configuration for the Router HTTP server. */ + private Configuration conf; + + /** Router using this HTTP server. */ + private final Router router; + + /** HTTP server. */ + private HttpServer2 httpServer; + + /** HTTP addresses. */ + private InetSocketAddress httpAddress; + private InetSocketAddress httpsAddress; + + + public RouterHttpServer(Router router) { + super(RouterHttpServer.class.getName()); + this.router = router; + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + this.conf = configuration; + + // Get HTTP address + this.httpAddress = conf.getSocketAddr( + RBFConfigKeys.DFS_ROUTER_HTTP_BIND_HOST_KEY, + RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, + RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_DEFAULT, + RBFConfigKeys.DFS_ROUTER_HTTP_PORT_DEFAULT); + + // Get HTTPs address + this.httpsAddress = conf.getSocketAddr( + RBFConfigKeys.DFS_ROUTER_HTTPS_BIND_HOST_KEY, + RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_KEY, + RBFConfigKeys.DFS_ROUTER_HTTPS_ADDRESS_DEFAULT, + RBFConfigKeys.DFS_ROUTER_HTTPS_PORT_DEFAULT); + + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + // Build and start server + String webApp = "router"; + HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN( + this.conf, this.httpAddress, this.httpsAddress, webApp, + DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY, + DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY); + + this.httpServer = builder.build(); + + this.httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, this.router); + this.httpServer.setAttribute(JspHelper.CURRENT_CONF, this.conf); + setupServlets(this.httpServer, this.conf); + + this.httpServer.start(); + + // The server port can be ephemeral... ensure we have the correct info + InetSocketAddress listenAddress = this.httpServer.getConnectorAddress(0); + if (listenAddress != null) { + this.httpAddress = new InetSocketAddress(this.httpAddress.getHostName(), + listenAddress.getPort()); + } + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if(this.httpServer != null) { + this.httpServer.stop(); + } + super.serviceStop(); + } + + private static void setupServlets( + HttpServer2 httpServer, Configuration conf) { + // TODO Add servlets for FSCK, etc + } + + public InetSocketAddress getHttpAddress() { + return this.httpAddress; + } + + public InetSocketAddress getHttpsAddress() { + return this.httpsAddress; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java new file mode 100644 index 0000000..851538a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.source.JvmMetrics; + +/** + * This class is for maintaining the various Router activity statistics + * and publishing them through the metrics interfaces. + */ +@Metrics(name="RouterActivity", about="Router metrics", context="dfs") +public class RouterMetrics { + + private final MetricsRegistry registry = new MetricsRegistry("router"); + + @Metric("Duration in SafeMode at startup in msec") + private MutableGaugeInt safeModeTime; + + private JvmMetrics jvmMetrics = null; + + RouterMetrics( + String processName, String sessionId, final JvmMetrics jvmMetrics) { + this.jvmMetrics = jvmMetrics; + registry.tag(ProcessName, processName).tag(SessionId, sessionId); + } + + public static RouterMetrics create(Configuration conf) { + String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY); + String processName = "Router"; + MetricsSystem ms = DefaultMetricsSystem.instance(); + JvmMetrics jm = JvmMetrics.create(processName, sessionId, ms); + + return ms.register(new RouterMetrics(processName, sessionId, jm)); + } + + public JvmMetrics getJvmMetrics() { + return jvmMetrics; + } + + public void shutdown() { + DefaultMetricsSystem.shutdown(); + } + + public void setSafeModeTime(long elapsed) { + safeModeTime.set((int) elapsed); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java new file mode 100644 index 0000000..f4debce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics; +import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.service.AbstractService; + +/** + * Service to manage the metrics of the Router. + */ +public class RouterMetricsService extends AbstractService { + + /** Router for this metrics. */ + private final Router router; + + /** Router metrics. */ + private RouterMetrics routerMetrics; + /** Federation metrics. */ + private FederationMetrics federationMetrics; + /** Namenode mock metrics. */ + private NamenodeBeanMetrics nnMetrics; + + + public RouterMetricsService(final Router router) { + super(RouterMetricsService.class.getName()); + this.router = router; + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + this.routerMetrics = RouterMetrics.create(configuration); + } + + @Override + protected void serviceStart() throws Exception { + // Wrapper for all the FSNamesystem JMX interfaces + this.nnMetrics = new NamenodeBeanMetrics(this.router); + + // Federation MBean JMX interface + this.federationMetrics = new FederationMetrics(this.router); + } + + @Override + protected void serviceStop() throws Exception { + // Remove JMX interfaces + if (this.federationMetrics != null) { + this.federationMetrics.close(); + } + + // Remove Namenode JMX interfaces + if (this.nnMetrics != null) { + this.nnMetrics.close(); + } + + // Shutdown metrics + if (this.routerMetrics != null) { + this.routerMetrics.shutdown(); + } + } + + /** + * Get the metrics system for the Router. + * + * @return Router metrics. + */ + public RouterMetrics getRouterMetrics() { + return this.routerMetrics; + } + + /** + * Get the federation metrics. + * + * @return Federation metrics. + */ + public FederationMetrics getFederationMetrics() { + return this.federationMetrics; + } + + /** + * Get the JVM metrics for the Router. + * + * @return JVM metrics. + */ + public JvmMetrics getJvmMetrics() { + if (this.routerMetrics == null) { + return null; + } + return this.routerMetrics.getJvmMetrics(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java new file mode 100644 index 0000000..9d81dce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterPermissionChecker.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Class that helps in checking permissions in Router-based federation. + */ +public class RouterPermissionChecker extends FSPermissionChecker { + static final Log LOG = LogFactory.getLog(RouterPermissionChecker.class); + + /** Mount table default permission. */ + public static final short MOUNT_TABLE_PERMISSION_DEFAULT = 00755; + + public RouterPermissionChecker(String routerOwner, String supergroup, + UserGroupInformation callerUgi) { + super(routerOwner, supergroup, callerUgi, null); + } + + /** + * Whether a mount table entry can be accessed by the current context. + * + * @param mountTable + * MountTable being accessed + * @param access + * type of action being performed on the cache pool + * @throws AccessControlException + * if mount table cannot be accessed + */ + public void checkPermission(MountTable mountTable, FsAction access) + throws AccessControlException { + if (isSuperUser()) { + return; + } + + FsPermission mode = mountTable.getMode(); + if (getUser().equals(mountTable.getOwnerName()) + && mode.getUserAction().implies(access)) { + return; + } + + if (isMemberOfGroup(mountTable.getGroupName()) + && mode.getGroupAction().implies(access)) { + return; + } + + if (!getUser().equals(mountTable.getOwnerName()) + && !isMemberOfGroup(mountTable.getGroupName()) + && mode.getOtherAction().implies(access)) { + return; + } + + throw new AccessControlException( + "Permission denied while accessing mount table " + + mountTable.getSourcePath() + + ": user " + getUser() + " does not have " + access.toString() + + " permissions."); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java new file mode 100644 index 0000000..0df34fc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.isParentEntry; + +import java.util.HashSet; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; + +/** + * Router quota manager in Router. The manager maintains + * {@link RouterQuotaUsage} cache of mount tables and do management + * for the quota caches. + */ +public class RouterQuotaManager { + /** Quota usage <MountTable Path, Aggregated QuotaUsage> cache. */ + private TreeMap<String, RouterQuotaUsage> cache; + + /** Lock to access the quota cache. */ + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + + public RouterQuotaManager() { + this.cache = new TreeMap<>(); + } + + /** + * Get all the mount quota paths. + */ + public Set<String> getAll() { + readLock.lock(); + try { + return this.cache.keySet(); + } finally { + readLock.unlock(); + } + } + + /** + * Get the nearest ancestor's quota usage, and meanwhile its quota was set. + * @param path The path being written. + * @return RouterQuotaUsage Quota usage. + */ + public RouterQuotaUsage getQuotaUsage(String path) { + readLock.lock(); + try { + RouterQuotaUsage quotaUsage = this.cache.get(path); + if (quotaUsage != null && isQuotaSet(quotaUsage)) { + return quotaUsage; + } + + // If not found, look for its parent path usage value. + int pos = path.lastIndexOf(Path.SEPARATOR); + if (pos != -1) { + String parentPath = path.substring(0, pos); + return getQuotaUsage(parentPath); + } + } finally { + readLock.unlock(); + } + + return null; + } + + /** + * Get children paths (can including itself) under specified federation path. + * @param parentPath + * @return Set<String> Children path set. + */ + public Set<String> getPaths(String parentPath) { + readLock.lock(); + try { + String from = parentPath; + String to = parentPath + Character.MAX_VALUE; + SortedMap<String, RouterQuotaUsage> subMap = this.cache.subMap(from, to); + + Set<String> validPaths = new HashSet<>(); + if (subMap != null) { + for (String path : subMap.keySet()) { + if (isParentEntry(path, parentPath)) { + validPaths.add(path); + } + } + } + return validPaths; + } finally { + readLock.unlock(); + } + } + + /** + * Put new entity into cache. + * @param path Mount table path. + * @param quotaUsage Corresponding cache value. + */ + public void put(String path, RouterQuotaUsage quotaUsage) { + writeLock.lock(); + try { + this.cache.put(path, quotaUsage); + } finally { + writeLock.unlock(); + } + } + + /** + * Remove the entity from cache. + * @param path Mount table path. + */ + public void remove(String path) { + writeLock.lock(); + try { + this.cache.remove(path); + } finally { + writeLock.unlock(); + } + } + + /** + * Clean up the cache. + */ + public void clear() { + writeLock.lock(); + try { + this.cache.clear(); + } finally { + writeLock.unlock(); + } + } + + /** + * Check if the quota was set. + * @param quota RouterQuotaUsage set in mount table. + */ + public boolean isQuotaSet(RouterQuotaUsage quota) { + if (quota != null) { + long nsQuota = quota.getQuota(); + long ssQuota = quota.getSpaceQuota(); + + // once nsQuota or ssQuota was set, this mount table is quota set + if (nsQuota != HdfsConstants.QUOTA_DONT_SET + || ssQuota != HdfsConstants.QUOTA_DONT_SET) { + return true; + } + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java new file mode 100644 index 0000000..9fc93c1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically update the {@link RouterQuotaUsage} + * cached information in the {@link Router} and update corresponding + * mount table in State Store. + */ +public class RouterQuotaUpdateService extends PeriodicService { + private static final Logger LOG = + LoggerFactory.getLogger(RouterQuotaUpdateService.class); + + private MountTableStore mountTableStore; + private RouterRpcServer rpcServer; + /** Router using this Service. */ + private final Router router; + /** Router Quota manager. */ + private RouterQuotaManager quotaManager; + + public RouterQuotaUpdateService(final Router router) throws IOException { + super(RouterQuotaUpdateService.class.getName()); + this.router = router; + this.rpcServer = router.getRpcServer(); + this.quotaManager = router.getQuotaManager(); + + if (this.quotaManager == null) { + throw new IOException("Router quota manager is not initialized."); + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.setIntervalMs(conf.getTimeDuration( + RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL, + RBFConfigKeys.DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS)); + + super.serviceInit(conf); + } + + @Override + protected void periodicInvoke() { + LOG.debug("Start to update quota cache."); + try { + List<MountTable> updateMountTables = new LinkedList<>(); + List<MountTable> mountTables = getQuotaSetMountTables(); + for (MountTable entry : mountTables) { + String src = entry.getSourcePath(); + RouterQuotaUsage oldQuota = entry.getQuota(); + long nsQuota = oldQuota.getQuota(); + long ssQuota = oldQuota.getSpaceQuota(); + // Call RouterRpcServer#getQuotaUsage for getting current quota usage. + QuotaUsage currentQuotaUsage = this.rpcServer.getQuotaModule() + .getQuotaUsage(src); + // If quota is not set in some subclusters under federation path, + // set quota for this path. + if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_DONT_SET) { + this.rpcServer.setQuota(src, nsQuota, ssQuota, null); + } + + RouterQuotaUsage newQuota = generateNewQuota(oldQuota, + currentQuotaUsage); + this.quotaManager.put(src, newQuota); + entry.setQuota(newQuota); + + // only update mount tables which quota was changed + if (!oldQuota.equals(newQuota)) { + updateMountTables.add(entry); + + LOG.debug( + "Update quota usage entity of path: {}, nsCount: {}," + + " nsQuota: {}, ssCount: {}, ssQuota: {}.", + src, newQuota.getFileAndDirectoryCount(), + newQuota.getQuota(), newQuota.getSpaceConsumed(), + newQuota.getSpaceQuota()); + } + } + + updateMountTableEntries(updateMountTables); + } catch (IOException e) { + LOG.error("Quota cache updated error.", e); + } + } + + /** + * Get mount table store management interface. + * @return MountTableStore instance. + * @throws IOException + */ + private MountTableStore getMountTableStore() throws IOException { + if (this.mountTableStore == null) { + this.mountTableStore = router.getStateStore().getRegisteredRecordStore( + MountTableStore.class); + if (this.mountTableStore == null) { + throw new IOException("Mount table state store is not available."); + } + } + return this.mountTableStore; + } + + /** + * Get all the existing mount tables. + * @return List of mount tables. + * @throws IOException + */ + private List<MountTable> getMountTableEntries() throws IOException { + // scan mount tables from root path + GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest + .newInstance("/"); + GetMountTableEntriesResponse getResponse = getMountTableStore() + .getMountTableEntries(getRequest); + return getResponse.getEntries(); + } + + /** + * Get mount tables which quota was set. + * During this time, the quota usage cache will also be updated by + * quota manager: + * 1. Stale paths (entries) will be removed. + * 2. Existing entries will be override and updated. + * @return List of mount tables which quota was set. + * @throws IOException + */ + private List<MountTable> getQuotaSetMountTables() throws IOException { + List<MountTable> mountTables = getMountTableEntries(); + Set<String> stalePaths = new HashSet<>(); + for (String path : this.quotaManager.getAll()) { + stalePaths.add(path); + } + + List<MountTable> neededMountTables = new LinkedList<>(); + for (MountTable entry : mountTables) { + // select mount tables which is quota set + if (isQuotaSet(entry)) { + neededMountTables.add(entry); + } + + // update mount table entries info in quota cache + String src = entry.getSourcePath(); + this.quotaManager.put(src, entry.getQuota()); + stalePaths.remove(src); + } + + // remove stale paths that currently cached + for (String stalePath : stalePaths) { + this.quotaManager.remove(stalePath); + } + + return neededMountTables; + } + + /** + * Check if the quota was set in given MountTable. + * @param mountTable Mount table entry. + */ + private boolean isQuotaSet(MountTable mountTable) { + if (mountTable != null) { + return this.quotaManager.isQuotaSet(mountTable.getQuota()); + } + return false; + } + + /** + * Generate a new quota based on old quota and current quota usage value. + * @param oldQuota Old quota stored in State Store. + * @param currentQuotaUsage Current quota usage value queried from + * subcluster. + * @return A new RouterQuotaUsage. + */ + private RouterQuotaUsage generateNewQuota(RouterQuotaUsage oldQuota, + QuotaUsage currentQuotaUsage) { + RouterQuotaUsage newQuota = new RouterQuotaUsage.Builder() + .fileAndDirectoryCount(currentQuotaUsage.getFileAndDirectoryCount()) + .quota(oldQuota.getQuota()) + .spaceConsumed(currentQuotaUsage.getSpaceConsumed()) + .spaceQuota(oldQuota.getSpaceQuota()).build(); + return newQuota; + } + + /** + * Write out updated mount table entries into State Store. + * @param updateMountTables Mount tables to be updated. + * @throws IOException + */ + private void updateMountTableEntries(List<MountTable> updateMountTables) + throws IOException { + for (MountTable entry : updateMountTables) { + UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest + .newInstance(entry); + getMountTableStore().updateMountTableEntry(updateRequest); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java new file mode 100644 index 0000000..eedd80f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUsage.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; +import org.apache.hadoop.hdfs.server.namenode.DirectoryWithQuotaFeature; +import org.apache.hadoop.hdfs.server.namenode.Quota; +import org.apache.hadoop.util.StringUtils; + +/** + * The subclass of {@link QuotaUsage} used in Router-based federation. + */ +public final class RouterQuotaUsage extends QuotaUsage { + + /** Default quota usage count. */ + public static final long QUOTA_USAGE_COUNT_DEFAULT = 0; + + private RouterQuotaUsage(Builder builder) { + super(builder); + } + + /** Build the instance based on the builder. */ + public static class Builder extends QuotaUsage.Builder { + + public RouterQuotaUsage build() { + return new RouterQuotaUsage(this); + } + + @Override + public Builder fileAndDirectoryCount(long count) { + super.fileAndDirectoryCount(count); + return this; + } + + @Override + public Builder quota(long quota) { + super.quota(quota); + return this; + } + + @Override + public Builder spaceConsumed(long spaceConsumed) { + super.spaceConsumed(spaceConsumed); + return this; + } + + @Override + public Builder spaceQuota(long spaceQuota) { + super.spaceQuota(spaceQuota); + return this; + } + } + + /** + * Verify if namespace quota is violated once quota is set. Relevant + * method {@link DirectoryWithQuotaFeature#verifyNamespaceQuota}. + * @throws NSQuotaExceededException + */ + public void verifyNamespaceQuota() throws NSQuotaExceededException { + if (Quota.isViolated(getQuota(), getFileAndDirectoryCount())) { + throw new NSQuotaExceededException(getQuota(), + getFileAndDirectoryCount()); + } + } + + /** + * Verify if storage space quota is violated once quota is set. Relevant + * method {@link DirectoryWithQuotaFeature#verifyStoragespaceQuota}. + * @throws DSQuotaExceededException + */ + public void verifyStoragespaceQuota() throws DSQuotaExceededException { + if (Quota.isViolated(getSpaceQuota(), getSpaceConsumed())) { + throw new DSQuotaExceededException(getSpaceQuota(), getSpaceConsumed()); + } + } + + @Override + public String toString() { + String nsQuota = String.valueOf(getQuota()); + String nsCount = String.valueOf(getFileAndDirectoryCount()); + if (getQuota() == HdfsConstants.QUOTA_DONT_SET) { + nsQuota = "-"; + nsCount = "-"; + } + + String ssQuota = StringUtils.byteDesc(getSpaceQuota()); + String ssCount = StringUtils.byteDesc(getSpaceConsumed()); + if (getSpaceQuota() == HdfsConstants.QUOTA_DONT_SET) { + ssQuota = "-"; + ssCount = "-"; + } + + StringBuilder str = new StringBuilder(); + str.append("[NsQuota: ").append(nsQuota).append("/") + .append(nsCount); + str.append(", SsQuota: ").append(ssQuota) + .append("/").append(ssCount) + .append("]"); + return str.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java new file mode 100644 index 0000000..0d298ac --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -0,0 +1,1021 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A client proxy for Router -> NN communication using the NN ClientProtocol. + * <p> + * Provides routers to invoke remote ClientProtocol methods and handle + * retries/failover. + * <ul> + * <li>invokeSingle Make a single request to a single namespace + * <li>invokeSequential Make a sequential series of requests to multiple + * ordered namespaces until a condition is met. + * <li>invokeConcurrent Make concurrent requests to multiple namespaces and + * return all of the results. + * </ul> + * Also maintains a cached pool of connections to NNs. Connections are managed + * by the ConnectionManager and are unique to each user + NN. The size of the + * connection pool can be configured. Larger pools allow for more simultaneous + * requests to a single NN from a single user. + */ +public class RouterRpcClient { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterRpcClient.class); + + + /** Router identifier. */ + private final String routerId; + + /** Interface to identify the active NN for a nameservice or blockpool ID. */ + private final ActiveNamenodeResolver namenodeResolver; + + /** Connection pool to the Namenodes per user for performance. */ + private final ConnectionManager connectionManager; + /** Service to run asynchronous calls. */ + private final ExecutorService executorService; + /** Retry policy for router -> NN communication. */ + private final RetryPolicy retryPolicy; + /** Optional perf monitor. */ + private final RouterRpcMonitor rpcMonitor; + + /** Pattern to parse a stack trace line. */ + private static final Pattern STACK_TRACE_PATTERN = + Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)"); + + + /** + * Create a router RPC client to manage remote procedure calls to NNs. + * + * @param conf Hdfs Configuation. + * @param resolver A NN resolver to determine the currently active NN in HA. + * @param monitor Optional performance monitor. + */ + public RouterRpcClient(Configuration conf, String identifier, + ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { + this.routerId = identifier; + + this.namenodeResolver = resolver; + + this.connectionManager = new ConnectionManager(conf); + this.connectionManager.start(); + + int numThreads = conf.getInt( + RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, + RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("RPC Router Client-%d") + .build(); + this.executorService = Executors.newFixedThreadPool( + numThreads, threadFactory); + + this.rpcMonitor = monitor; + + int maxFailoverAttempts = conf.getInt( + HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, + HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); + int maxRetryAttempts = conf.getInt( + RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS, + RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT); + int failoverSleepBaseMillis = conf.getInt( + HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, + HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); + int failoverSleepMaxMillis = conf.getInt( + HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_KEY, + HdfsClientConfigKeys.Failover.SLEEPTIME_MAX_DEFAULT); + this.retryPolicy = RetryPolicies.failoverOnNetworkException( + RetryPolicies.TRY_ONCE_THEN_FAIL, maxFailoverAttempts, maxRetryAttempts, + failoverSleepBaseMillis, failoverSleepMaxMillis); + } + + /** + * Get the active namenode resolver used by this client. + * @return Active namenode resolver. + */ + public ActiveNamenodeResolver getNamenodeResolver() { + return this.namenodeResolver; + } + + /** + * Shutdown the client. + */ + public void shutdown() { + if (this.connectionManager != null) { + this.connectionManager.close(); + } + if (this.executorService != null) { + this.executorService.shutdownNow(); + } + } + + /** + * Total number of available sockets between the router and NNs. + * + * @return Number of namenode clients. + */ + public int getNumConnections() { + return this.connectionManager.getNumConnections(); + } + + /** + * Total number of available sockets between the router and NNs. + * + * @return Number of namenode clients. + */ + public int getNumActiveConnections() { + return this.connectionManager.getNumActiveConnections(); + } + + /** + * Total number of open connection pools to a NN. Each connection pool. + * represents one user + one NN. + * + * @return Number of connection pools. + */ + public int getNumConnectionPools() { + return this.connectionManager.getNumConnectionPools(); + } + + /** + * Number of connections between the router and NNs being created sockets. + * + * @return Number of connections waiting to be created. + */ + public int getNumCreatingConnections() { + return this.connectionManager.getNumCreatingConnections(); + } + + /** + * JSON representation of the connection pool. + * + * @return String representation of the JSON. + */ + public String getJSON() { + return this.connectionManager.getJSON(); + } + + /** + * Get ClientProtocol proxy client for a NameNode. Each combination of user + + * NN must use a unique proxy client. Previously created clients are cached + * and stored in a connection pool by the ConnectionManager. + * + * @param ugi User group information. + * @param nsId Nameservice identifier. + * @param rpcAddress ClientProtocol RPC server address of the NN. + * @return ConnectionContext containing a ClientProtocol proxy client for the + * NN + current user. + * @throws IOException If we cannot get a connection to the NameNode. + */ + private ConnectionContext getConnection( + UserGroupInformation ugi, String nsId, String rpcAddress) + throws IOException { + ConnectionContext connection = null; + try { + // Each proxy holds the UGI info for the current user when it is created. + // This cache does not scale very well, one entry per user per namenode, + // and may need to be adjusted and/or selectively pruned. The cache is + // important due to the excessive overhead of creating a new proxy wrapper + // for each individual request. + + // TODO Add tokens from the federated UGI + connection = this.connectionManager.getConnection(ugi, rpcAddress); + LOG.debug("User {} NN {} is using connection {}", + ugi.getUserName(), rpcAddress, connection); + } catch (Exception ex) { + LOG.error("Cannot open NN client to address: {}", rpcAddress, ex); + } + + if (connection == null) { + throw new IOException("Cannot get a connection to " + rpcAddress); + } + return connection; + } + + /** + * Convert an exception to an IOException. + * + * For a non-IOException, wrap it with IOException. For a RemoteException, + * unwrap it. For an IOException which is not a RemoteException, return it. + * + * @param e Exception to convert into an exception. + * @return Created IO exception. + */ + private static IOException toIOException(Exception e) { + if (e instanceof RemoteException) { + return ((RemoteException) e).unwrapRemoteException(); + } + if (e instanceof IOException) { + return (IOException)e; + } + return new IOException(e); + } + + /** + * If we should retry the RPC call. + * + * @param ioe IOException reported. + * @param retryCount Number of retries. + * @param nsId Nameservice ID. + * @return Retry decision. + * @throws IOException Original exception if the retry policy generates one + * or IOException for no available namenodes. + */ + private RetryDecision shouldRetry(final IOException ioe, final int retryCount, + final String nsId) throws IOException { + // check for the case of cluster unavailable state + if (isClusterUnAvailable(nsId)) { + // we allow to retry once if cluster is unavailable + if (retryCount == 0) { + return RetryDecision.RETRY; + } else { + throw new IOException("No namenode available under nameservice " + nsId, + ioe); + } + } + + try { + final RetryPolicy.RetryAction a = + this.retryPolicy.shouldRetry(ioe, retryCount, 0, true); + return a.action; + } catch (Exception ex) { + LOG.error("Re-throwing API exception, no more retries", ex); + throw toIOException(ex); + } + } + + /** + * Invokes a method against the ClientProtocol proxy server. If a standby + * exception is generated by the call to the client, retries using the + * alternate server. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param ugi User group information. + * @param namenodes A prioritized list of namenodes within the same + * nameservice. + * @param method Remote ClientProtcol method to invoke. + * @param params Variable list of parameters matching the method. + * @return The result of invoking the method. + * @throws IOException + */ + private Object invokeMethod( + final UserGroupInformation ugi, + final List<? extends FederationNamenodeContext> namenodes, + final Method method, final Object... params) throws IOException { + + if (namenodes == null || namenodes.isEmpty()) { + throw new IOException("No namenodes to invoke " + method.getName() + + " with params " + Arrays.toString(params) + " from " + this.routerId); + } + + Object ret = null; + if (rpcMonitor != null) { + rpcMonitor.proxyOp(); + } + boolean failover = false; + Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>(); + for (FederationNamenodeContext namenode : namenodes) { + ConnectionContext connection = null; + try { + String nsId = namenode.getNameserviceId(); + String rpcAddress = namenode.getRpcAddress(); + connection = this.getConnection(ugi, nsId, rpcAddress); + ProxyAndInfo<ClientProtocol> client = connection.getClient(); + ClientProtocol proxy = client.getProxy(); + ret = invoke(nsId, 0, method, proxy, params); + if (failover) { + // Success on alternate server, update + InetSocketAddress address = client.getAddress(); + namenodeResolver.updateActiveNamenode(nsId, address); + } + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(true); + } + return ret; + } catch (IOException ioe) { + ioes.put(namenode, ioe); + if (ioe instanceof StandbyException) { + // Fail over indicated by retry policy and/or NN + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureStandby(); + } + failover = true; + } else if (ioe instanceof RemoteException) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(true); + } + // RemoteException returned by NN + throw (RemoteException) ioe; + } else { + // Other communication error, this is a failure + // Communication retries are handled by the retry policy + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureCommunicate(); + this.rpcMonitor.proxyOpComplete(false); + } + throw ioe; + } + } finally { + if (connection != null) { + connection.release(); + } + } + } + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(false); + } + + // All namenodes were unavailable or in standby + String msg = "No namenode available to invoke " + method.getName() + " " + + Arrays.toString(params); + LOG.error(msg); + for (Entry<FederationNamenodeContext, IOException> entry : + ioes.entrySet()) { + FederationNamenodeContext namenode = entry.getKey(); + String nsId = namenode.getNameserviceId(); + String nnId = namenode.getNamenodeId(); + String addr = namenode.getRpcAddress(); + IOException ioe = entry.getValue(); + if (ioe instanceof StandbyException) { + LOG.error("{} {} at {} is in Standby", nsId, nnId, addr); + } else { + LOG.error("{} {} at {} error: \"{}\"", + nsId, nnId, addr, ioe.getMessage()); + } + } + throw new StandbyException(msg); + } + + /** + * Invokes a method on the designated object. Catches exceptions specific to + * the invocation. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param nsId Identifier for the namespace + * @param retryCount Current retry times + * @param method Method to invoke + * @param obj Target object for the method + * @param params Variable parameters + * @return Response from the remote server + * @throws IOException + * @throws InterruptedException + */ + private Object invoke(String nsId, int retryCount, final Method method, + final Object obj, final Object... params) throws IOException { + try { + return method.invoke(obj, params); + } catch (IllegalAccessException e) { + LOG.error("Unexpected exception while proxying API", e); + return null; + } catch (IllegalArgumentException e) { + LOG.error("Unexpected exception while proxying API", e); + return null; + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + IOException ioe = (IOException) cause; + + // Check if we should retry. + RetryDecision decision = shouldRetry(ioe, retryCount, nsId); + if (decision == RetryDecision.RETRY) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpRetries(); + } + + // retry + return invoke(nsId, ++retryCount, method, obj, params); + } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { + // failover, invoker looks for standby exceptions for failover. + if (ioe instanceof StandbyException) { + throw ioe; + } else { + throw new StandbyException(ioe.getMessage()); + } + } else { + if (ioe instanceof RemoteException) { + RemoteException re = (RemoteException) ioe; + ioe = re.unwrapRemoteException(); + ioe = getCleanException(ioe); + } + throw ioe; + } + } else { + throw new IOException(e); + } + } + } + + /** + * Check if the cluster of given nameservice id is available. + * @param nsId nameservice ID. + * @return + * @throws IOException + */ + private boolean isClusterUnAvailable(String nsId) throws IOException { + List<? extends FederationNamenodeContext> nnState = this.namenodeResolver + .getNamenodesForNameserviceId(nsId); + + if (nnState != null) { + for (FederationNamenodeContext nnContext : nnState) { + // Once we find one NN is in active state, we assume this + // cluster is available. + if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) { + return false; + } + } + } + + return true; + } + + /** + * Get a clean copy of the exception. Sometimes the exceptions returned by the + * server contain the full stack trace in the message. + * + * @param ioe Exception to clean up. + * @return Copy of the original exception with a clean message. + */ + private static IOException getCleanException(IOException ioe) { + IOException ret = null; + + String msg = ioe.getMessage(); + Throwable cause = ioe.getCause(); + StackTraceElement[] stackTrace = ioe.getStackTrace(); + + // Clean the message by removing the stack trace + int index = msg.indexOf("\n"); + if (index > 0) { + String[] msgSplit = msg.split("\n"); + msg = msgSplit[0]; + + // Parse stack trace from the message + List<StackTraceElement> elements = new LinkedList<>(); + for (int i=1; i<msgSplit.length; i++) { + String line = msgSplit[i]; + Matcher matcher = STACK_TRACE_PATTERN.matcher(line); + if (matcher.find()) { + String declaringClass = matcher.group(1); + String methodName = matcher.group(2); + String fileName = matcher.group(3); + int lineNumber = Integer.parseInt(matcher.group(4)); + StackTraceElement element = new StackTraceElement( + declaringClass, methodName, fileName, lineNumber); + elements.add(element); + } + } + stackTrace = elements.toArray(new StackTraceElement[elements.size()]); + } + + // Create the new output exception + if (ioe instanceof RemoteException) { + RemoteException re = (RemoteException)ioe; + ret = new RemoteException(re.getClassName(), msg); + } else { + // Try the simple constructor and initialize the fields + Class<? extends IOException> ioeClass = ioe.getClass(); + try { + Constructor<? extends IOException> constructor = + ioeClass.getDeclaredConstructor(String.class); + ret = constructor.newInstance(msg); + } catch (ReflectiveOperationException e) { + // If there are errors, just use the input one + LOG.error("Could not create exception {}", ioeClass.getSimpleName(), e); + ret = ioe; + } + } + if (ret != null) { + ret.initCause(cause); + ret.setStackTrace(stackTrace); + } + + return ret; + } + + /** + * Invokes a ClientProtocol method. Determines the target nameservice via a + * provided block. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param block Block used to determine appropriate nameservice. + * @param method The remote method and parameters to invoke. + * @return The result of invoking the method. + * @throws IOException + */ + public Object invokeSingle(final ExtendedBlock block, RemoteMethod method) + throws IOException { + String bpId = block.getBlockPoolId(); + return invokeSingleBlockPool(bpId, method); + } + + /** + * Invokes a ClientProtocol method. Determines the target nameservice using + * the block pool id. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param bpId Block pool identifier. + * @param method The remote method and parameters to invoke. + * @return The result of invoking the method. + * @throws IOException + */ + public Object invokeSingleBlockPool(final String bpId, RemoteMethod method) + throws IOException { + String nsId = getNameserviceForBlockPoolId(bpId); + return invokeSingle(nsId, method); + } + + /** + * Invokes a ClientProtocol method against the specified namespace. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param nsId Target namespace for the method. + * @param method The remote method and parameters to invoke. + * @return The result of invoking the method. + * @throws IOException + */ + public Object invokeSingle(final String nsId, RemoteMethod method) + throws IOException { + UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + List<? extends FederationNamenodeContext> nns = + getNamenodesForNameservice(nsId); + RemoteLocationContext loc = new RemoteLocation(nsId, "/"); + return invokeMethod(ugi, nns, method.getMethod(), method.getParams(loc)); + } + + /** + * Invokes a single proxy call for a single location. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param location RemoteLocation to invoke. + * @param remoteMethod The remote method and parameters to invoke. + * @return The result of invoking the method if successful. + * @throws IOException + */ + public Object invokeSingle(final RemoteLocationContext location, + RemoteMethod remoteMethod) throws IOException { + List<RemoteLocationContext> locations = Collections.singletonList(location); + return invokeSequential(locations, remoteMethod); + } + + /** + * Invokes sequential proxy calls to different locations. Continues to invoke + * calls until a call returns without throwing a remote exception. + * + * @param locations List of locations/nameservices to call concurrently. + * @param remoteMethod The remote method and parameters to invoke. + * @return The result of the first successful call, or if no calls are + * successful, the result of the last RPC call executed. + * @throws IOException if the success condition is not met and one of the RPC + * calls generated a remote exception. + */ + public Object invokeSequential( + final List<? extends RemoteLocationContext> locations, + final RemoteMethod remoteMethod) throws IOException { + return invokeSequential(locations, remoteMethod, null, null); + } + + /** + * Invokes sequential proxy calls to different locations. Continues to invoke + * calls until the success condition is met, or until all locations have been + * attempted. + * + * The success condition may be specified by: + * <ul> + * <li>An expected result class + * <li>An expected result value + * </ul> + * + * If no expected result class/values are specified, the success condition is + * a call that does not throw a remote exception. + * + * @param locations List of locations/nameservices to call concurrently. + * @param remoteMethod The remote method and parameters to invoke. + * @param expectedResultClass In order to be considered a positive result, the + * return type must be of this class. + * @param expectedResultValue In order to be considered a positive result, the + * return value must equal the value of this object. + * @return The result of the first successful call, or if no calls are + * successful, the result of the first RPC call executed. + * @throws IOException if the success condition is not met, return the first + * remote exception generated. + */ + public <T> T invokeSequential( + final List<? extends RemoteLocationContext> locations, + final RemoteMethod remoteMethod, Class<T> expectedResultClass, + Object expectedResultValue) throws IOException { + + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = remoteMethod.getMethod(); + IOException firstThrownException = null; + IOException lastThrownException = null; + Object firstResult = null; + // Invoke in priority order + for (final RemoteLocationContext loc : locations) { + String ns = loc.getNameserviceId(); + List<? extends FederationNamenodeContext> namenodes = + getNamenodesForNameservice(ns); + try { + Object[] params = remoteMethod.getParams(loc); + Object result = invokeMethod(ugi, namenodes, m, params); + // Check if the result is what we expected + if (isExpectedClass(expectedResultClass, result) && + isExpectedValue(expectedResultValue, result)) { + // Valid result, stop here + @SuppressWarnings("unchecked") + T ret = (T)result; + return ret; + } + if (firstResult == null) { + firstResult = result; + } + } catch (IOException ioe) { + // Record it and move on + lastThrownException = (IOException) ioe; + if (firstThrownException == null) { + firstThrownException = lastThrownException; + } + } catch (Exception e) { + // Unusual error, ClientProtocol calls always use IOException (or + // RemoteException). Re-wrap in IOException for compatibility with + // ClientProtcol. + LOG.error("Unexpected exception {} proxying {} to {}", + e.getClass(), m.getName(), ns, e); + lastThrownException = new IOException( + "Unexpected exception proxying API " + e.getMessage(), e); + if (firstThrownException == null) { + firstThrownException = lastThrownException; + } + } + } + + if (firstThrownException != null) { + // re-throw the last exception thrown for compatibility + throw firstThrownException; + } + // Return the last result, whether it is the value we are looking for or a + @SuppressWarnings("unchecked") + T ret = (T)firstResult; + return ret; + } + + /** + * Checks if a result matches the required result class. + * + * @param expectedClass Required result class, null to skip the check. + * @param clazz The result to check. + * @return True if the result is an instance of the required class or if the + * expected class is null. + */ + private static boolean isExpectedClass(Class<?> expectedClass, Object clazz) { + if (expectedClass == null) { + return true; + } else if (clazz == null) { + return false; + } else { + return expectedClass.isInstance(clazz); + } + } + + /** + * Checks if a result matches the expected value. + * + * @param expectedValue The expected value, null to skip the check. + * @param value The result to check. + * @return True if the result is equals to the expected value or if the + * expected value is null. + */ + private static boolean isExpectedValue(Object expectedValue, Object value) { + if (expectedValue == null) { + return true; + } else if (value == null) { + return false; + } else { + return value.equals(expectedValue); + } + } + + /** + * Invoke multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param <T> The type of the remote location. + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @param requireResponse If true an exception will be thrown if all calls do + * not complete. If false exceptions are ignored and all data results + * successfully received are returned. + * @param standby If the requests should go to the standby namenodes too. + * @throws IOException If all the calls throw an exception. + */ + public <T extends RemoteLocationContext, R> void invokeConcurrent( + final Collection<T> locations, final RemoteMethod method, + boolean requireResponse, boolean standby) throws IOException { + invokeConcurrent(locations, method, requireResponse, standby, void.class); + } + + /** + * Invokes multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param <T> The type of the remote location. + * @param <R> The type of the remote method return. + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @param requireResponse If true an exception will be thrown if all calls do + * not complete. If false exceptions are ignored and all data results + * successfully received are returned. + * @param standby If the requests should go to the standby namenodes too. + * @param clazz Type of the remote return type. + * @return Result of invoking the method per subcluster: nsId -> result. + * @throws IOException If requiredResponse=true and any of the calls throw an + * exception. + */ + public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent( + final Collection<T> locations, final RemoteMethod method, + boolean requireResponse, boolean standby, Class<R> clazz) + throws IOException { + return invokeConcurrent( + locations, method, requireResponse, standby, -1, clazz); + } + + /** + * Invokes multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param <T> The type of the remote location. + * @param <R> The type of the remote method return. + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @param requireResponse If true an exception will be thrown if all calls do + * not complete. If false exceptions are ignored and all data results + * successfully received are returned. + * @param standby If the requests should go to the standby namenodes too. + * @param timeOutMs Timeout for each individual call. + * @param clazz Type of the remote return type. + * @return Result of invoking the method per subcluster: nsId -> result. + * @throws IOException If requiredResponse=true and any of the calls throw an + * exception. + */ + @SuppressWarnings("unchecked") + public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent( + final Collection<T> locations, final RemoteMethod method, + boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz) + throws IOException { + + final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + final Method m = method.getMethod(); + + if (locations.size() == 1) { + // Shortcut, just one call + T location = locations.iterator().next(); + String ns = location.getNameserviceId(); + final List<? extends FederationNamenodeContext> namenodes = + getNamenodesForNameservice(ns); + Object[] paramList = method.getParams(location); + Object result = invokeMethod(ugi, namenodes, m, paramList); + return Collections.singletonMap(location, clazz.cast(result)); + } + + List<T> orderedLocations = new LinkedList<>(); + Set<Callable<Object>> callables = new HashSet<>(); + for (final T location : locations) { + String nsId = location.getNameserviceId(); + final List<? extends FederationNamenodeContext> namenodes = + getNamenodesForNameservice(nsId); + final Object[] paramList = method.getParams(location); + if (standby) { + // Call the objectGetter to all NNs (including standby) + for (final FederationNamenodeContext nn : namenodes) { + String nnId = nn.getNamenodeId(); + final List<FederationNamenodeContext> nnList = + Collections.singletonList(nn); + T nnLocation = location; + if (location instanceof RemoteLocation) { + nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest()); + } + orderedLocations.add(nnLocation); + callables.add(new Callable<Object>() { + public Object call() throws Exception { + return invokeMethod(ugi, nnList, m, paramList); + } + }); + } + } else { + // Call the objectGetter in order of nameservices in the NS list + orderedLocations.add(location); + callables.add(new Callable<Object>() { + public Object call() throws Exception { + return invokeMethod(ugi, namenodes, m, paramList); + } + }); + } + } + + if (rpcMonitor != null) { + rpcMonitor.proxyOp(); + } + + try { + List<Future<Object>> futures = null; + if (timeOutMs > 0) { + futures = executorService.invokeAll( + callables, timeOutMs, TimeUnit.MILLISECONDS); + } else { + futures = executorService.invokeAll(callables); + } + Map<T, R> results = new TreeMap<>(); + Map<T, IOException> exceptions = new TreeMap<>(); + for (int i=0; i<futures.size(); i++) { + T location = orderedLocations.get(i); + try { + Future<Object> future = futures.get(i); + Object result = future.get(); + results.put(location, clazz.cast(result)); + } catch (CancellationException ce) { + T loc = orderedLocations.get(i); + String msg = + "Invocation to \"" + loc + "\" for \"" + method + "\" timed out"; + LOG.error(msg); + IOException ioe = new IOException(msg); + exceptions.put(location, ioe); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + LOG.debug("Canot execute {} in {}: {}", + m.getName(), location, cause.getMessage()); + + // Convert into IOException if needed + IOException ioe = null; + if (cause instanceof IOException) { + ioe = (IOException) cause; + } else { + ioe = new IOException("Unhandled exception while proxying API " + + m.getName() + ": " + cause.getMessage(), cause); + } + + // Response from all servers required, use this error. + if (requireResponse) { + throw ioe; + } + + // Store the exceptions + exceptions.put(location, ioe); + } + } + + // Throw the exception for the first location if there are no results + if (results.isEmpty()) { + T location = orderedLocations.get(0); + IOException ioe = exceptions.get(location); + if (ioe != null) { + throw ioe; + } + } + + return results; + } catch (InterruptedException ex) { + LOG.error("Unexpected error while invoking API: {}", ex.getMessage()); + throw new IOException( + "Unexpected error while invoking API " + ex.getMessage(), ex); + } + } + + /** + * Get a prioritized list of NNs that share the same nameservice ID (in the + * same namespace). NNs that are reported as ACTIVE will be first in the list. + * + * @param nsId The nameservice ID for the namespace. + * @return A prioritized list of NNs to use for communication. + * @throws IOException If a NN cannot be located for the nameservice ID. + */ + private List<? extends FederationNamenodeContext> getNamenodesForNameservice( + final String nsId) throws IOException { + + final List<? extends FederationNamenodeContext> namenodes = + namenodeResolver.getNamenodesForNameserviceId(nsId); + + if (namenodes == null || namenodes.isEmpty()) { + throw new IOException("Cannot locate a registered namenode for " + nsId + + " from " + this.routerId); + } + return namenodes; + } + + /** + * Get a prioritized list of NNs that share the same block pool ID (in the + * same namespace). NNs that are reported as ACTIVE will be first in the list. + * + * @param bpId The blockpool ID for the namespace. + * @return A prioritized list of NNs to use for communication. + * @throws IOException If a NN cannot be located for the block pool ID. + */ + private List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId( + final String bpId) throws IOException { + + List<? extends FederationNamenodeContext> namenodes = + namenodeResolver.getNamenodesForBlockPoolId(bpId); + + if (namenodes == null || namenodes.isEmpty()) { + throw new IOException("Cannot locate a registered namenode for " + bpId + + " from " + this.routerId); + } + return namenodes; + } + + /** + * Get the nameservice identifier for a block pool. + * + * @param bpId Identifier of the block pool. + * @return Nameservice identifier. + * @throws IOException If a NN cannot be located for the block pool ID. + */ + private String getNameserviceForBlockPoolId(final String bpId) + throws IOException { + List<? extends FederationNamenodeContext> namenodes = + getNamenodesForBlockPoolId(bpId); + FederationNamenodeContext namenode = namenodes.get(0); + return namenode.getNameserviceId(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java new file mode 100644 index 0000000..df9aa11 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; + +/** + * Metrics and monitoring interface for the router RPC server. Allows pluggable + * diagnostics and monitoring services to be attached. + */ +public interface RouterRpcMonitor { + + /** + * Initialize the monitor. + * @param conf Configuration for the monitor. + * @param server RPC server. + * @param store State Store. + */ + void init( + Configuration conf, RouterRpcServer server, StateStoreService store); + + /** + * Get Router RPC metrics info. + * @return The instance of FederationRPCMetrics. + */ + FederationRPCMetrics getRPCMetrics(); + + /** + * Close the monitor. + */ + void close(); + + /** + * Start processing an operation on the Router. + */ + void startOp(); + + /** + * Start proxying an operation to the Namenode. + * @return Id of the thread doing the proxying. + */ + long proxyOp(); + + /** + * Mark a proxy operation as completed. + * @param success If the operation was successful. + */ + void proxyOpComplete(boolean success); + + /** + * Failed to proxy an operation to a Namenode because it was in standby. + */ + void proxyOpFailureStandby(); + + /** + * Failed to proxy an operation to a Namenode because of an unexpected + * exception. + */ + void proxyOpFailureCommunicate(); + + /** + * Failed to proxy an operation because it is not implemented. + */ + void proxyOpNotImplemented(); + + /** + * Retry to proxy an operation to a Namenode because of an unexpected + * exception. + */ + void proxyOpRetries(); + + /** + * If the Router cannot contact the State Store in an operation. + */ + void routerFailureStateStore(); + + /** + * If the Router is in safe mode. + */ + void routerFailureSafemode(); + + /** + * If a path is locked. + */ + void routerFailureLocked(); + + /** + * If a path is in a read only mount point. + */ + void routerFailureReadOnly(); +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org