Repository: hbase
Updated Branches:
  refs/heads/branch-2 08212e50f -> e5a8f162a


HBASE-18448 Added refresh HFiles coprocessor endpoint

Signed-off-by: anoopsamjohn <anoopsamj...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e5a8f162
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e5a8f162
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e5a8f162

Branch: refs/heads/branch-2
Commit: e5a8f162a2ded86b42f40650d879fb730afb84dc
Parents: 08212e5
Author: Ajay Jadhav <jadha...@amazon.com>
Authored: Mon Aug 21 17:24:28 2017 -0700
Committer: anoopsamjohn <anoopsamj...@gmail.com>
Committed: Thu Aug 24 20:44:45 2017 +0530

----------------------------------------------------------------------
 .../client/example/RefreshHFilesClient.java     |  95 ++++++++++
 .../example/RefreshHFilesEndpoint.java          |  86 +++++++++
 .../src/main/protobuf/RefreshHFiles.proto       |  36 ++++
 .../example/TestRefreshHFilesEndpoint.java      | 177 +++++++++++++++++++
 .../hadoop/hbase/HBaseTestingUtility.java       |  18 ++
 5 files changed, 412 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a8f162/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.java
new file mode 100644
index 0000000..0401959
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/RefreshHFilesClient.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.example;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * This client class is for invoking the refresh HFile function deployed on the
+ * Region Server side via the RefreshHFilesService.
+ */
+public class RefreshHFilesClient implements Closeable {
+  private static final Log LOG = LogFactory.getLog(RefreshHFilesClient.class);
+  private final Connection connection;
+
+  /**
+   * Constructor with Conf object
+   *
+   * @param cfg
+   */
+  public RefreshHFilesClient(Configuration cfg) {
+    try {
+      this.connection = ConnectionFactory.createConnection(cfg);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.connection != null && !this.connection.isClosed()) {
+      this.connection.close();
+    }
+  }
+
+  public void refreshHFiles(final TableName tableName) throws Throwable {
+    try (Table table = connection.getTable(tableName)) {
+      refreshHFiles(table);
+    }
+  }
+
+  public void refreshHFiles(final Table table) throws Throwable {
+    final RefreshHFilesProtos.RefreshHFilesRequest request = 
RefreshHFilesProtos.RefreshHFilesRequest
+                                                               
.getDefaultInstance();
+    table.coprocessorService(RefreshHFilesProtos.RefreshHFilesService.class, 
HConstants.EMPTY_START_ROW,
+                             HConstants.EMPTY_END_ROW,
+                             new 
Batch.Call<RefreshHFilesProtos.RefreshHFilesService,
+                                             
RefreshHFilesProtos.RefreshHFilesResponse>() {
+                               @Override
+                               public 
RefreshHFilesProtos.RefreshHFilesResponse call(
+                                 RefreshHFilesProtos.RefreshHFilesService 
refreshHFilesService)
+                                 throws IOException {
+                                 ServerRpcController controller = new 
ServerRpcController();
+                                 
BlockingRpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> rpcCallback =
+                                   new BlockingRpcCallback<>();
+                                 
refreshHFilesService.refreshHFiles(controller, request, rpcCallback);
+                                 if (controller.failedOnException()) {
+                                   throw controller.getFailedOn();
+                                 }
+                                 return rpcCallback.get();
+                               }
+                             });
+    LOG.debug("Done refreshing HFiles");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a8f162/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java
new file mode 100644
index 0000000..5b97411
--- /dev/null
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RefreshHFilesEndpoint.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.protobuf.generated.RefreshHFilesProtos;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+import java.io.IOException;
+
+/**
+ * Coprocessor endpoint to refresh HFiles on replica.
+ * <p>
+ * <p>
+ * For the protocol buffer definition of the RefreshHFilesService, see the 
source file located under
+ * hbase-protocol/src/main/protobuf/RefreshHFiles.proto.
+ * </p>
+ */
+public class RefreshHFilesEndpoint extends 
RefreshHFilesProtos.RefreshHFilesService
+  implements Coprocessor, CoprocessorService {
+  protected static final Log LOG = 
LogFactory.getLog(RefreshHFilesEndpoint.class);
+  private RegionCoprocessorEnvironment env;
+
+  public RefreshHFilesEndpoint() {
+  }
+
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  @Override
+  public void refreshHFiles(RpcController controller, 
RefreshHFilesProtos.RefreshHFilesRequest request,
+                            
RpcCallback<RefreshHFilesProtos.RefreshHFilesResponse> done) {
+    try {
+      for (Store store : env.getRegion().getStores()) {
+        LOG.debug("Refreshing HFiles for region: " + 
store.getRegionInfo().getRegionNameAsString() +
+                    " and store: " + store.getColumnFamilyName() + "class:" + 
store.getClass());
+        store.refreshStoreFiles();
+      }
+    } catch (IOException ioe) {
+      LOG.error("Exception while trying to refresh store files: ", ioe);
+      CoprocessorRpcUtils.setControllerException(controller, ioe);
+    }
+    done.run(RefreshHFilesProtos.RefreshHFilesResponse.getDefaultInstance());
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    if (env instanceof RegionCoprocessorEnvironment) {
+      this.env = (RegionCoprocessorEnvironment) env;
+    } else {
+      throw new CoprocessorException("Must be loaded on a table region!");
+    }
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a8f162/hbase-examples/src/main/protobuf/RefreshHFiles.proto
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/protobuf/RefreshHFiles.proto 
b/hbase-examples/src/main/protobuf/RefreshHFiles.proto
new file mode 100644
index 0000000..11cbab0
--- /dev/null
+++ b/hbase-examples/src/main/protobuf/RefreshHFiles.proto
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "RefreshHFilesProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message RefreshHFilesRequest {
+}
+
+message RefreshHFilesResponse {
+}
+
+service RefreshHFilesService {
+    rpc refreshHFiles(RefreshHFilesRequest)
+      returns (RefreshHFilesResponse);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a8f162/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
new file mode 100644
index 0000000..a037f85
--- /dev/null
+++ 
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.example.RefreshHFilesClient;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Category(MediumTests.class)
+public class TestRefreshHFilesEndpoint {
+  private static final Log LOG = 
LogFactory.getLog(TestRefreshHFilesEndpoint.class);
+  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+  private static final int NUM_MASTER = 1;
+  private static final int NUM_RS = 2;
+  private static final TableName TABLE_NAME = 
TableName.valueOf("testRefreshRegionHFilesEP");
+  private static final byte[] FAMILY = Bytes.toBytes("family");
+  private static final byte[] QUALIFIER = Bytes.toBytes("qualifier");
+  private static final byte[][] SPLIT_KEY = new byte[][] { Bytes.toBytes("30") 
};
+  private static final int NUM_ROWS = 5;
+  private static final String HFILE_NAME = "123abcdef";
+
+  private static Configuration CONF = HTU.getConfiguration();
+  private static MiniHBaseCluster cluster;
+  private static Table table;
+
+  public static void setUp(String regionImpl) {
+    try {
+      CONF.set(HConstants.REGION_IMPL, regionImpl);
+      CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+
+      CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 
RefreshHFilesEndpoint.class.getName());
+      cluster = HTU.startMiniCluster(NUM_MASTER, NUM_RS);
+
+      // Create table
+      table = HTU.createTable(TABLE_NAME, FAMILY, SPLIT_KEY);
+
+      // this will create 2 regions spread across slaves
+      HTU.loadNumericRows(table, FAMILY, 1, 20);
+      HTU.flush(TABLE_NAME);
+    } catch (Exception ex) {
+      LOG.error("Couldn't finish setup", ex);
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testRefreshRegionHFilesEndpoint() throws Exception {
+    setUp(HRegion.class.getName());
+    MasterFileSystem mfs = 
HTU.getMiniHBaseCluster().getMaster().getMasterFileSystem();
+    Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), TABLE_NAME);
+    for (Region region : cluster.getRegions(TABLE_NAME)) {
+      Path regionDir = new Path(tableDir, 
region.getRegionInfo().getEncodedName());
+      Path familyDir = new Path(regionDir, Bytes.toString(FAMILY));
+      HFileTestUtil
+        .createHFile(HTU.getConfiguration(), HTU.getTestFileSystem(), new 
Path(familyDir, HFILE_NAME), FAMILY,
+                     QUALIFIER, Bytes.toBytes("50"), Bytes.toBytes("60"), 
NUM_ROWS);
+    }
+    assertEquals(2, HTU.getNumHFiles(TABLE_NAME, FAMILY));
+    callRefreshRegionHFilesEndPoint();
+    assertEquals(4, HTU.getNumHFiles(TABLE_NAME, FAMILY));
+  }
+
+  @Test(expected = IOException.class)
+  public void testRefreshRegionHFilesEndpointWithException() throws 
IOException {
+    setUp(HRegionForRefreshHFilesEP.class.getName());
+    callRefreshRegionHFilesEndPoint();
+  }
+
+  private void callRefreshRegionHFilesEndPoint() throws IOException {
+    try {
+      RefreshHFilesClient refreshHFilesClient = new RefreshHFilesClient(CONF);
+      refreshHFilesClient.refreshHFiles(TABLE_NAME);
+    } catch (RetriesExhaustedException rex) {
+      if (rex.getCause() instanceof IOException)
+        throw new IOException();
+    } catch (Throwable ex) {
+      LOG.error(ex);
+      fail("Couldn't call the RefreshRegionHFilesEndpoint");
+    }
+  }
+
+  public static class HRegionForRefreshHFilesEP extends HRegion {
+    HStoreWithFaultyRefreshHFilesAPI store;
+
+    public HRegionForRefreshHFilesEP(final Path tableDir, final WAL wal, final 
FileSystem fs,
+                                     final Configuration confParam, final 
HRegionInfo regionInfo,
+                                     final TableDescriptor htd, final 
RegionServerServices rsServices) {
+      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
+    }
+
+    @Override
+    public List<Store> getStores() {
+      List<Store> list = new ArrayList<Store>(stores.size());
+      /**
+       * This is used to trigger the custom definition (faulty)
+       * of refresh HFiles API.
+       */
+      try {
+        if (this.store == null)
+          store = new HStoreWithFaultyRefreshHFilesAPI(this, new 
HColumnDescriptor(FAMILY), this.conf);
+        list.add(store);
+      } catch (IOException ioe) {
+        LOG.info("Couldn't instantiate custom store implementation", ioe);
+      }
+
+      list.addAll(stores.values());
+      return list;
+    }
+  }
+
+  public static class HStoreWithFaultyRefreshHFilesAPI extends HStore {
+    public HStoreWithFaultyRefreshHFilesAPI(final HRegion region, final 
ColumnFamilyDescriptor family,
+                                            final Configuration confParam) 
throws IOException {
+      super(region, family, confParam);
+    }
+
+    @Override
+    public void refreshStoreFiles() throws IOException {
+      throw new IOException();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e5a8f162/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 28d2a24..1ec5ecd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -4425,4 +4425,22 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
     HBaseKerberosUtils.setKeytabFileForTesting(keytabFile.getAbsolutePath());
     return kdc;
   }
+
+  public int getNumHFiles(final TableName tableName, final byte[] family) {
+    int numHFiles = 0;
+    for (RegionServerThread regionServerThread : 
getMiniHBaseCluster().getRegionServerThreads()) {
+      numHFiles+= getNumHFilesForRS(regionServerThread.getRegionServer(), 
tableName,
+                                    family);
+    }
+    return numHFiles;
+  }
+
+  public int getNumHFilesForRS(final HRegionServer rs, final TableName 
tableName,
+                               final byte[] family) {
+    int numHFiles = 0;
+    for (Region region : rs.getOnlineRegions(tableName)) {
+      numHFiles += region.getStore(family).getStorefilesCount();
+    }
+    return numHFiles;
+  }
 }

Reply via email to