Author: wheat9 Date: Tue Apr 29 06:04:14 2014 New Revision: 1590876 URL: http://svn.apache.org/r1590876 Log: HADOOP-10508. RefreshCallQueue fails when authorization is enabled. Contributed by Chris Li.
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1590876&r1=1590875&r2=1590876&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Tue Apr 29 06:04:14 2014 @@ -32,6 +32,7 @@ import org.apache.hadoop.security.author import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; /** * {@link PolicyProvider} for HDFS protocols. @@ -64,7 +65,10 @@ public class HDFSPolicyProvider extends RefreshUserMappingsProtocol.class), new Service( CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GET_USER_MAPPINGS, - GetUserMappingsProtocol.class) + GetUserMappingsProtocol.class), + new Service( + CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE, + RefreshCallQueueProtocol.class) }; @Override Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1590876&r1=1590875&r2=1590876&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Tue Apr 29 06:04:14 2014 @@ -1056,7 +1056,7 @@ public class DFSAdmin extends FsShell { NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf), RefreshCallQueueProtocol.class).getProxy(); - // Refresh the user-to-groups mappings + // Refresh the call queue refreshProtocol.refreshCallQueue(); return 0; Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java?rev=1590876&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestRefreshCallQueue.java Tue Apr 29 06:04:14 2014 @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.tools.DFSAdmin; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestRefreshCallQueue { + private MiniDFSCluster cluster; + private Configuration config; + private FileSystem fs; + static int mockQueueConstructions; + static int mockQueuePuts; + private static final int NNPort = 54222; + private static String CALLQUEUE_CONFIG_KEY = "ipc." + NNPort + ".callqueue.impl"; + + @Before + public void setUp() throws Exception { + // We want to count additional events, so we reset here + mockQueueConstructions = 0; + mockQueuePuts = 0; + + config = new Configuration(); + config.setClass(CALLQUEUE_CONFIG_KEY, + MockCallQueue.class, BlockingQueue.class); + config.set("hadoop.security.authorization", "true"); + + FileSystem.setDefaultUri(config, "hdfs://localhost:" + NNPort); + fs = FileSystem.get(config); + cluster = new MiniDFSCluster.Builder(config).nameNodePort(NNPort).build(); + cluster.waitActive(); + } + + @After + public void tearDown() throws Exception { + if(cluster!=null) { + cluster.shutdown(); + } + } + + @SuppressWarnings("serial") + public static class MockCallQueue<E> extends LinkedBlockingQueue<E> { + public MockCallQueue(int cap, String ns, Configuration conf) { + super(cap); + mockQueueConstructions++; + } + + public void put(E e) throws InterruptedException { + super.put(e); + mockQueuePuts++; + } + } + + // Returns true if mock queue was used for put + public boolean canPutInMockQueue() throws IOException { + int putsBefore = mockQueuePuts; + fs.exists(new Path("/")); // Make an RPC call + return mockQueuePuts > putsBefore; + } + + @Test + public void testRefresh() throws Exception { + assertTrue("Mock queue should have been constructed", mockQueueConstructions > 0); + assertTrue("Puts are routed through MockQueue", canPutInMockQueue()); + int lastMockQueueConstructions = mockQueueConstructions; + + // Replace queue with the queue specified in core-site.xml, which would be the LinkedBlockingQueue + DFSAdmin admin = new DFSAdmin(config); + String [] args = new String[]{"-refreshCallQueue"}; + int exitCode = admin.run(args); + assertEquals("DFSAdmin should return 0", 0, exitCode); + + assertEquals("Mock queue should have no additional constructions", lastMockQueueConstructions, mockQueueConstructions); + try { + assertFalse("Puts are routed through LBQ instead of MockQueue", canPutInMockQueue()); + } catch (IOException ioe){ + fail("Could not put into queue at all"); + } + } + +} \ No newline at end of file