HBASE-19369 Switch to Builder Pattern In WAL

This patch switches to the builder pattern by adding a helper method.
It also checks to ensure that the pattern is available (i.e. that
HBase is running on a hadoop version that supports it).

Amending-Author: Mike Drob <md...@apache.org>
Signed-off-by: tedyu <yuzhih...@gmail.com>
Signed-off-by: zhangduo <zhang...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ab184401
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ab184401
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ab184401

Branch: refs/heads/branch-2
Commit: ab18440107bc397932c90b4fb1e01139c897075b
Parents: 51b0d0e
Author: Alex Leblang <alex.lebl...@cloudera.com>
Authored: Thu Jul 26 15:47:13 2018 -0500
Committer: Mike Drob <md...@apache.org>
Committed: Fri Jul 27 23:42:53 2018 -0500

----------------------------------------------------------------------
 .../apache/hadoop/hbase/util/CommonFSUtils.java | 127 +++++++++++++++++++
 .../procedure2/store/wal/WALProcedureStore.java |   2 +-
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java   |   1 +
 .../regionserver/wal/ProtobufLogWriter.java     |  12 +-
 .../regionserver/wal/TestHBaseWalOnEC.java      | 119 +++++++++++++++++
 5 files changed, 256 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ab184401/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
index 2e480e6..8924098 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
@@ -821,6 +821,133 @@ public abstract class CommonFSUtils {
     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
   }
 
+  private static class DfsBuilderUtility {
+    static Class<?> dfsClass = null;
+    static Method createMethod;
+    static Method overwriteMethod;
+    static Method bufferSizeMethod;
+    static Method blockSizeMethod;
+    static Method recursiveMethod;
+    static Method replicateMethod;
+    static Method replicationMethod;
+    static Method buildMethod;
+    static boolean allMethodsPresent = false;
+
+    static {
+      String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
+      String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
+      Class<?> builderClass = null;
+
+      try {
+        dfsClass = Class.forName(dfsName);
+      } catch (ClassNotFoundException e) {
+        LOG.debug("{} not available, will not use builder API for file 
creation.", dfsName);
+      }
+      try {
+        builderClass = Class.forName(builderName);
+      } catch (ClassNotFoundException e) {
+        LOG.debug("{} not available, will not use builder API for file 
creation.", builderName);
+      }
+
+      if (dfsClass != null && builderClass != null) {
+        try {
+          createMethod = dfsClass.getMethod("createFile", Path.class);
+          overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
+          bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
+          blockSizeMethod = builderClass.getMethod("blockSize", long.class);
+          recursiveMethod = builderClass.getMethod("recursive");
+          replicateMethod = builderClass.getMethod("replicate");
+          replicationMethod = builderClass.getMethod("replication", 
short.class);
+          buildMethod = builderClass.getMethod("build");
+
+          allMethodsPresent = true;
+          LOG.debug("Using builder API via reflection for DFS file creation.");
+        } catch (NoSuchMethodException e) {
+          LOG.debug("Could not find method on builder; will use old DFS API 
for file creation {}",
+              e.getMessage());
+        }
+      }
+    }
+
+    /**
+     * Attempt to use builder API via reflection to create a file with the 
given parameters and
+     * replication enabled.
+     */
+    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean 
overwritable,
+        int bufferSize, short replication, long blockSize, boolean 
isRecursive) throws IOException {
+      if (allMethodsPresent && dfsClass.isInstance(fs)) {
+        try {
+          Object builder;
+
+          builder = createMethod.invoke(fs, path);
+          builder = overwriteMethod.invoke(builder, overwritable);
+          builder = bufferSizeMethod.invoke(builder, bufferSize);
+          builder = blockSizeMethod.invoke(builder, blockSize);
+          if (isRecursive) {
+            builder = recursiveMethod.invoke(builder);
+          }
+          builder = replicateMethod.invoke(builder);
+          builder = replicationMethod.invoke(builder, replication);
+          return (FSDataOutputStream) buildMethod.invoke(builder);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          // Should have caught this failure during initialization, so log 
full trace here
+          LOG.warn("Couldn't use reflection with builder API", e);
+        }
+      }
+
+      if (isRecursive) {
+        return fs.create(path, overwritable, bufferSize, replication, 
blockSize, null);
+      }
+      return fs.createNonRecursive(path, overwritable, bufferSize, 
replication, blockSize, null);
+    }
+
+    /**
+     * Attempt to use builder API via reflection to create a file with the 
given parameters and
+     * replication enabled.
+     */
+    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean 
overwritable)
+        throws IOException {
+      if (allMethodsPresent && dfsClass.isInstance(fs)) {
+        try {
+          Object builder;
+
+          builder = createMethod.invoke(fs, path);
+          builder = overwriteMethod.invoke(builder, overwritable);
+          builder = replicateMethod.invoke(builder);
+          return (FSDataOutputStream) buildMethod.invoke(builder);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          // Should have caught this failure during initialization, so log 
full trace here
+          LOG.warn("Couldn't use reflection with builder API", e);
+        }
+      }
+
+      return fs.create(path, overwritable);
+    }
+  }
+
+  /**
+   * Attempt to use builder API via reflection to create a file with the given 
parameters and
+   * replication enabled.
+   * <p>
+   * Will not attempt to enable replication when passed an HFileSystem.
+   */
+  public static FSDataOutputStream createForWal(FileSystem fs, Path path, 
boolean overwritable)
+      throws IOException {
+    return DfsBuilderUtility.createHelper(fs, path, overwritable);
+  }
+
+  /**
+   * Attempt to use builder API via reflection to create a file with the given 
parameters and
+   * replication enabled.
+   * <p>
+   * Will not attempt to enable replication when passed an HFileSystem.
+   */
+  public static FSDataOutputStream createForWal(FileSystem fs, Path path, 
boolean overwritable,
+      int bufferSize, short replication, long blockSize, boolean isRecursive) 
throws IOException {
+    return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, 
replication,
+        blockSize, isRecursive);
+  }
+
   // Holder singleton idiom. JVM spec ensures this will be run at most once 
per Classloader, and
   // not until we attempt to reference it.
   private static class StreamCapabilities {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab184401/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 99001f7..cc3eea3 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -1028,7 +1028,7 @@ public class WALProcedureStore extends ProcedureStoreBase 
{
     long startPos = -1;
     newLogFile = getLogFilePath(logId);
     try {
-      newStream = fs.create(newLogFile, false);
+      newStream = CommonFSUtils.createForWal(fs, newLogFile, false);
     } catch (FileAlreadyExistsException e) {
       LOG.error("Log file with id=" + logId + " already exists", e);
       return false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab184401/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index f9d04b9..d1645f8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -54,6 +54,7 @@ public final class AsyncFSOutputHelper {
     final FSDataOutputStream out;
     int bufferSize = 
fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
       CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+    // This is not a Distributed File System, so it won't be erasure coded; no 
builder API needed
     if (createParent) {
       out = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
     } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab184401/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index b4e2cbf..5c8e0d2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -60,7 +60,9 @@ public class ProtobufLogWriter extends 
AbstractProtobufLogWriter
   public void close() throws IOException {
     if (this.output != null) {
       try {
-        if (!trailerWritten) writeWALTrailer();
+        if (!trailerWritten) {
+          writeWALTrailer();
+        }
         this.output.close();
       } catch (NullPointerException npe) {
         // Can get a NPE coming up from down in DFSClient$DFSOutputStream#close
@@ -73,7 +75,9 @@ public class ProtobufLogWriter extends 
AbstractProtobufLogWriter
   @Override
   public void sync(boolean forceSync) throws IOException {
     FSDataOutputStream fsdos = this.output;
-    if (fsdos == null) return; // Presume closed
+    if (fsdos == null) {
+      return; // Presume closed
+    }
     fsdos.flush();
     if (forceSync) {
       fsdos.hsync();
@@ -90,8 +94,8 @@ public class ProtobufLogWriter extends 
AbstractProtobufLogWriter
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, 
int bufferSize,
       short replication, long blockSize) throws IOException, 
StreamLacksCapabilityException {
-    this.output = fs.createNonRecursive(path, overwritable, bufferSize, 
replication, blockSize,
-      null);
+    this.output = CommonFSUtils.createForWal(fs, path, overwritable, 
bufferSize, replication,
+        blockSize, false);
     if 
(fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) 
{
       if (!CommonFSUtils.hasCapability(output, "hflush")) {
         throw new StreamLacksCapabilityException("hflush");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ab184401/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
new file mode 100644
index 0000000..a7f1624
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestHBaseWalOnEC {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);
+
+  private static final HBaseTestingUtility util = new HBaseTestingUtility();
+
+  private static final String HFLUSH = "hflush";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    try {
+      MiniDFSCluster cluster = util.startMiniDFSCluster(3); // Need 3 DNs for 
RS-3-2 policy
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      Method enableAllECPolicies = 
DFSTestUtil.class.getMethod("enableAllECPolicies",
+          DistributedFileSystem.class);
+      enableAllECPolicies.invoke(null, fs);
+
+      DFSClient client = fs.getClient();
+      Method setErasureCodingPolicy = 
DFSClient.class.getMethod("setErasureCodingPolicy",
+          String.class, String.class);
+      setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a 
built-in policy
+
+      try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
+        // If this comes back as having hflush then some test setup assumption 
is wrong.
+        // Fail the test so that a developer has to look and triage
+        assertFalse("Did not enable EC!", CommonFSUtils.hasCapability(out, 
HFLUSH));
+      }
+    } catch (NoSuchMethodException e) {
+      // We're not testing anything interesting if EC is not available, so 
skip the rest of the test
+      Assume.assumeNoException("Using an older version of hadoop; EC not 
available.", e);
+    }
+
+    
util.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE,
 true);
+    util.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testStreamCreate() throws IOException {
+    try (FSDataOutputStream out = 
CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
+        new Path("/testStreamCreate"), true)) {
+      assertTrue(CommonFSUtils.hasCapability(out, HFLUSH));
+    }
+  }
+
+  @Test
+  public void testFlush() throws IOException {
+    byte[] row = Bytes.toBytes("row");
+    byte[] cf = Bytes.toBytes("cf");
+    byte[] cq = Bytes.toBytes("cq");
+    byte[] value = Bytes.toBytes("value");
+
+    TableName name = TableName.valueOf(getClass().getSimpleName());
+
+    Table t = util.createTable(name, cf);
+    t.put(new Put(row).addColumn(cf, cq, value));
+
+    util.getAdmin().flush(name);
+
+    assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq));
+  }
+}
+

Reply via email to