Repository: hbase
Updated Branches:
  refs/heads/master 5051ab4e7 -> 70f330dc8


http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
new file mode 100644
index 0000000..d55adef
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.RegionServerCallable;
+import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import com.google.common.collect.Lists;
+
+/**
+ * Tests bulk loading of HFiles with old non-secure client for backward 
compatibility. Will be
+ * removed when old non-secure client for backward compatibility is not 
supported.
+ */
+@RunWith(Parameterized.class)
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestHRegionServerBulkLoadWithOldClient extends 
TestHRegionServerBulkLoad {
+  public TestHRegionServerBulkLoadWithOldClient(int duration) {
+    super(duration);
+  }
+
+  private static final Log LOG = 
LogFactory.getLog(TestHRegionServerBulkLoadWithOldClient.class);
+
+  public static class AtomicHFileLoader extends RepeatingTestThread {
+    final AtomicLong numBulkLoads = new AtomicLong();
+    final AtomicLong numCompactions = new AtomicLong();
+    private TableName tableName;
+
+    public AtomicHFileLoader(TableName tableName, TestContext ctx,
+        byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.tableName = tableName;
+    }
+
+    public void doAnAction() throws Exception {
+      long iteration = numBulkLoads.getAndIncrement();
+      Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
+          iteration));
+
+      // create HFiles for different column families
+      FileSystem fs = UTIL.getTestFileSystem();
+      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
+      final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], 
String>>(
+          NUM_CFS);
+      for (int i = 0; i < NUM_CFS; i++) {
+        Path hfile = new Path(dir, family(i));
+        byte[] fam = Bytes.toBytes(family(i));
+        createHFile(fs, hfile, fam, QUAL, val, 1000);
+        famPaths.add(new Pair<>(fam, hfile.toString()));
+      }
+
+      // bulk load HFiles
+      final ClusterConnection conn = (ClusterConnection) 
UTIL.getAdmin().getConnection();
+      RegionServerCallable<Void> callable =
+          new RegionServerCallable<Void>(conn, tableName, 
Bytes.toBytes("aaa")) {
+        @Override
+        public Void call(int callTimeout) throws Exception {
+          LOG.info("Non-secure old client");
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+              BulkLoadHFileRequest request =
+                  RequestConverter
+                      .buildBulkLoadHFileRequest(famPaths, regionName, true, 
null, null);
+              getStub().bulkLoadHFile(null, request);
+              return null;
+        }
+      };
+      RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
+      RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
+      caller.callWithRetries(callable, Integer.MAX_VALUE);
+
+      // Periodically do compaction to reduce the number of open file handles.
+      if (numBulkLoads.get() % 5 == 0) {
+        // 5 * 50 = 250 open file handles!
+        callable = new RegionServerCallable<Void>(conn, tableName, 
Bytes.toBytes("aaa")) {
+          @Override
+          public Void call(int callTimeout) throws Exception {
+            LOG.debug("compacting " + getLocation() + " for row "
+                + Bytes.toStringBinary(getRow()));
+            AdminProtos.AdminService.BlockingInterface server =
+              conn.getAdmin(getLocation().getServerName());
+            CompactRegionRequest request =
+              RequestConverter.buildCompactRegionRequest(
+                getLocation().getRegionInfo().getRegionName(), true, null);
+            server.compactRegion(null, request);
+            numCompactions.incrementAndGet();
+            return null;
+          }
+        };
+        caller.callWithRetries(callable, Integer.MAX_VALUE);
+      }
+    }
+  }
+
+  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int 
numScanners)
+      throws Exception {
+    setupTable(tableName, 10);
+
+    TestContext ctx = new TestContext(UTIL.getConfiguration());
+
+    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
+    ctx.addThread(loader);
+
+    List<AtomicScanReader> scanners = Lists.newArrayList();
+    for (int i = 0; i < numScanners; i++) {
+      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, 
families);
+      scanners.add(scanner);
+      ctx.addThread(scanner);
+    }
+
+    ctx.startThreads();
+    ctx.waitFor(millisToRun);
+    ctx.stop();
+
+    LOG.info("Loaders:");
+    LOG.info("  loaded " + loader.numBulkLoads.get());
+    LOG.info("  compations " + loader.numCompactions.get());
+
+    LOG.info("Scanners:");
+    for (AtomicScanReader scanner : scanners) {
+      LOG.info("  scanned " + scanner.numScans.get());
+      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
new file mode 100644
index 0000000..6de6261
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.RegionServerCallable;
+import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests bulk loading of HFiles with old secure Endpoint client for backward 
compatibility. Will be
+ * removed when old non-secure client for backward compatibility is not 
supported.
+ */
+@RunWith(Parameterized.class)
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends 
TestHRegionServerBulkLoad {
+  public TestHRegionServerBulkLoadWithOldSecureEndpoint(int duration) {
+    super(duration);
+  }
+
+  private static final Log LOG = 
LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws IOException {
+    conf.setInt("hbase.rpc.timeout", 10 * 1000);
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+      "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+  }
+
+  public static class AtomicHFileLoader extends RepeatingTestThread {
+    final AtomicLong numBulkLoads = new AtomicLong();
+    final AtomicLong numCompactions = new AtomicLong();
+    private TableName tableName;
+
+    public AtomicHFileLoader(TableName tableName, TestContext ctx,
+        byte targetFamilies[][]) throws IOException {
+      super(ctx);
+      this.tableName = tableName;
+    }
+
+    public void doAnAction() throws Exception {
+      long iteration = numBulkLoads.getAndIncrement();
+      Path dir =  UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
+          iteration));
+
+      // create HFiles for different column families
+      FileSystem fs = UTIL.getTestFileSystem();
+      byte[] val = Bytes.toBytes(String.format("%010d", iteration));
+      final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], 
String>>(
+          NUM_CFS);
+      for (int i = 0; i < NUM_CFS; i++) {
+        Path hfile = new Path(dir, family(i));
+        byte[] fam = Bytes.toBytes(family(i));
+        createHFile(fs, hfile, fam, QUAL, val, 1000);
+        famPaths.add(new Pair<>(fam, hfile.toString()));
+      }
+
+      // bulk load HFiles
+      final ClusterConnection conn = (ClusterConnection) 
UTIL.getAdmin().getConnection();
+      Table table = conn.getTable(tableName);
+      final String bulkToken = new 
SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
+      RegionServerCallable<Void> callable =
+          new RegionServerCallable<Void>(conn, tableName, 
Bytes.toBytes("aaa")) {
+            @Override
+            public Void call(int callTimeout) throws Exception {
+              LOG.debug("Going to connect to server " + getLocation() + " for 
row "
+                  + Bytes.toStringBinary(getRow()));
+              try (Table table = conn.getTable(getTableName())) {
+                boolean loaded =
+                    new 
SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths, null,
+                      bulkToken, getLocation().getRegionInfo().getStartKey());
+              }
+              return null;
+            }
+          };
+      RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
+      RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
+      caller.callWithRetries(callable, Integer.MAX_VALUE);
+
+      // Periodically do compaction to reduce the number of open file handles.
+      if (numBulkLoads.get() % 5 == 0) {
+        // 5 * 50 = 250 open file handles!
+        callable = new RegionServerCallable<Void>(conn, tableName, 
Bytes.toBytes("aaa")) {
+          @Override
+          public Void call(int callTimeout) throws Exception {
+            LOG.debug("compacting " + getLocation() + " for row "
+                + Bytes.toStringBinary(getRow()));
+            AdminProtos.AdminService.BlockingInterface server =
+              conn.getAdmin(getLocation().getServerName());
+            CompactRegionRequest request =
+              RequestConverter.buildCompactRegionRequest(
+                getLocation().getRegionInfo().getRegionName(), true, null);
+            server.compactRegion(null, request);
+            numCompactions.incrementAndGet();
+            return null;
+          }
+        };
+        caller.callWithRetries(callable, Integer.MAX_VALUE);
+      }
+    }
+  }
+
+  void runAtomicBulkloadTest(TableName tableName, int millisToRun, int 
numScanners)
+      throws Exception {
+    setupTable(tableName, 10);
+
+    TestContext ctx = new TestContext(UTIL.getConfiguration());
+
+    AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
+    ctx.addThread(loader);
+
+    List<AtomicScanReader> scanners = Lists.newArrayList();
+    for (int i = 0; i < numScanners; i++) {
+      AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, 
families);
+      scanners.add(scanner);
+      ctx.addThread(scanner);
+    }
+
+    ctx.startThreads();
+    ctx.waitFor(millisToRun);
+    ctx.stop();
+
+    LOG.info("Loaders:");
+    LOG.info("  loaded " + loader.numBulkLoads.get());
+    LOG.info("  compations " + loader.numCompactions.get());
+
+    LOG.info("Scanners:");
+    for (AtomicScanReader scanner : scanners) {
+      LOG.info("  scanned " + scanner.numScans.get());
+      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
index edad059..0e60877 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -32,6 +31,7 @@ import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
@@ -61,6 +61,8 @@ public class TestPriorityRpc {
   public void setup() {
     Configuration conf = HBaseConfiguration.create();
     conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK
+    final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
+    TEST_UTIL.getDataTestDir(this.getClass().getName());
     CoordinatedStateManager cp = 
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
     regionServer = HRegionServer.constructRegionServer(HRegionServer.class, 
conf, cp);
     priority = regionServer.rpcServices.getPriority();

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
index f0e7ac9..274fe37 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/SecureTestUtil.java
@@ -99,8 +99,7 @@ public class SecureTestUtil {
     conf.set("hadoop.security.authentication", "simple");
     conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, 
AccessController.class.getName() +
       "," + MasterSyncObserver.class.getName());
-    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 
AccessController.class.getName() +
-      "," + SecureBulkLoadEndpoint.class.getName());
+    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 
AccessController.class.getName());
     conf.set(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, 
AccessController.class.getName());
     // Need HFile V3 for tags for security features
     conf.setInt(HFile.FORMAT_VERSION_KEY, 3);

Reply via email to