This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new a227a4a  HBASE-25692 Always try to close the WAL reader when we catch 
any exception (#3090)
a227a4a is described below

commit a227a4aa371b2a591c4bf9f92a27d2cdcf749eae
Author: Josh Elser <els...@apache.org>
AuthorDate: Mon Mar 29 15:15:58 2021 -0400

    HBASE-25692 Always try to close the WAL reader when we catch any exception 
(#3090)
    
    There are code paths in which we throw non-IOExceptions when
    initializing a WAL reader. However, we only close the InputStream to the
    WAL filesystem when the exception is an IOException. Close it if it is
    open in all cases.
    
    Co-authored-by: Josh Elser <jel...@cloudera.com>
    Signed-off-by: Andrew Purtell <apurt...@apache.org>
---
 .../org/apache/hadoop/hbase/wal/WALFactory.java    |  58 ++++-----
 .../apache/hadoop/hbase/wal/FileSystemProxy.java   | 105 +++++++++++++++++
 .../apache/hadoop/hbase/wal/TestWALFactory.java    | 129 +++++++++++++++++++++
 3 files changed, 266 insertions(+), 26 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index dab65f3..333f24d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -320,7 +320,9 @@ public class WALFactory {
           reader = lrClass.getDeclaredConstructor().newInstance();
           reader.init(fs, path, conf, null);
           return reader;
-        } catch (IOException e) {
+        } catch (Exception e) {
+          // catch Exception so that we close reader for all exceptions. If we 
don't
+          // close the reader, we leak a socket.
           if (reader != null) {
             try {
               reader.close();
@@ -330,34 +332,38 @@ public class WALFactory {
             }
           }
 
-          String msg = e.getMessage();
-          if (msg != null
-              && (msg.contains("Cannot obtain block length")
-                  || msg.contains("Could not obtain the last block") || msg
-                    .matches("Blocklist for [^ ]* has changed.*"))) {
-            if (++nbAttempt == 1) {
-              LOG.warn("Lease should have recovered. This is not expected. 
Will retry", e);
-            }
-            if (reporter != null && !reporter.progress()) {
-              throw new InterruptedIOException("Operation is cancelled");
-            }
-            if (nbAttempt > 2 && openTimeout < 
EnvironmentEdgeManager.currentTime()) {
-              LOG.error("Can't open after " + nbAttempt + " attempts and "
-                  + (EnvironmentEdgeManager.currentTime() - startWaiting) + 
"ms " + " for " + path);
-            } else {
-              try {
-                Thread.sleep(nbAttempt < 3 ? 500 : 1000);
-                continue; // retry
-              } catch (InterruptedException ie) {
-                InterruptedIOException iioe = new InterruptedIOException();
-                iioe.initCause(ie);
-                throw iioe;
+          // Only inspect the Exception to consider retry when it's an 
IOException
+          if (e instanceof IOException) {
+            String msg = e.getMessage();
+            if (msg != null
+                && (msg.contains("Cannot obtain block length")
+                    || msg.contains("Could not obtain the last block") || msg
+                      .matches("Blocklist for [^ ]* has changed.*"))) {
+              if (++nbAttempt == 1) {
+                LOG.warn("Lease should have recovered. This is not expected. 
Will retry", e);
+              }
+              if (reporter != null && !reporter.progress()) {
+                throw new InterruptedIOException("Operation is cancelled");
               }
+              if (nbAttempt > 2 && openTimeout < 
EnvironmentEdgeManager.currentTime()) {
+                LOG.error("Can't open after " + nbAttempt + " attempts and "
+                    + (EnvironmentEdgeManager.currentTime() - startWaiting) + 
"ms " + " for " + path);
+              } else {
+                try {
+                  Thread.sleep(nbAttempt < 3 ? 500 : 1000);
+                  continue; // retry
+                } catch (InterruptedException ie) {
+                  InterruptedIOException iioe = new InterruptedIOException();
+                  iioe.initCause(ie);
+                  throw iioe;
+                }
+              }
+              throw new LeaseNotRecoveredException(e);
             }
-            throw new LeaseNotRecoveredException(e);
-          } else {
-            throw e;
           }
+
+          // Rethrow the original exception if we are not retrying due to 
HDFS-isms.
+          throw e;
         }
       }
     } catch (IOException ie) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java
new file mode 100644
index 0000000..fb729f5
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Create a non-abstract "proxy" for FileSystem because FileSystem is an
+ * abstract class and not an interface. Only interfaces can be used with the
+ * Java Proxy class to override functionality via an InvocationHandler.
+ *
+ */
+public class FileSystemProxy extends FileSystem {
+  private final FileSystem real;
+
+  public FileSystemProxy(FileSystem real) {
+    this.real = real;
+  }
+
+  @Override
+  public FSDataInputStream open(Path p) throws IOException {
+    return real.open(p);
+  }
+
+  @Override
+  public URI getUri() {
+    return real.getUri();
+  }
+
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    return real.open(f, bufferSize);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission, boolean 
overwrite, int bufferSize,
+      short replication, long blockSize, Progressable progress) throws 
IOException {
+    return real.create(f, permission, overwrite, bufferSize, replication, 
blockSize, progress);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize, Progressable 
progress) throws IOException {
+    return real.append(f, bufferSize, progress);
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    return real.rename(src, dst);
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    return real.delete(f, recursive);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws FileNotFoundException, 
IOException {
+    return real.listStatus(f);
+  }
+
+  @Override
+  public void setWorkingDirectory(Path new_dir) {
+    real.setWorkingDirectory(new_dir);
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return real.getWorkingDirectory();
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    return real.mkdirs(f, permission);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    return real.getFileStatus(f);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 99c4f3b..8da9bc7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -26,11 +26,16 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.net.BindException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -51,10 +56,13 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -749,4 +757,125 @@ public class TestWALFactory {
     WALProvider metaWALProvider = walFactory.getMetaProvider();
     assertEquals(IOTestProvider.class, metaWALProvider.getClass());
   }
+
+  @Test
+  public void testReaderClosedOnBadCodec() throws IOException {
+    // Create our own Configuration and WALFactory to avoid breaking other 
test methods
+    Configuration confWithCodec = new Configuration(conf);
+    confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, 
BrokenWALCellCodec.class, Codec.class);
+    WALFactory customFactory = new WALFactory(confWithCodec, 
this.currentServername.toString());
+
+    // Hack a Proxy over the FileSystem so that we can track the InputStreams 
opened by
+    // the FileSystem and know if close() was called on those InputStreams.
+    List<InputStreamProxy> openedReaders = new ArrayList<>();
+    FileSystemProxy proxyFs = new FileSystemProxy(fs) {
+      @Override
+      public FSDataInputStream open(Path p) throws IOException {
+        InputStreamProxy is = new InputStreamProxy(super.open(p));
+        openedReaders.add(is);
+        return is;
+      }
+
+      @Override
+      public FSDataInputStream open(Path p, int blockSize) throws IOException {
+        InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
+        openedReaders.add(is);
+        return is;
+      }
+    };
+
+    final TableDescriptor htd =
+        
TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
+            
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
+    final RegionInfo hri = 
RegionInfoBuilder.newBuilder(htd.getTableName()).build();
+
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR);
+    for (byte[] fam : htd.getColumnFamilyNames()) {
+      scopes.put(fam, 0);
+    }
+    byte[] row = Bytes.toBytes("row");
+    WAL.Reader reader = null;
+    final MultiVersionConcurrencyControl mvcc = new 
MultiVersionConcurrencyControl(1);
+    try {
+      // Write one column in one edit.
+      WALEdit cols = new WALEdit();
+      cols.add(new KeyValue(row, Bytes.toBytes("column"),
+        Bytes.toBytes("0"), System.currentTimeMillis(), new byte[] { 0 }));
+      final WAL log = customFactory.getWAL(hri);
+      final long txid = log.appendData(hri, new 
WALKeyImpl(hri.getEncodedNameAsBytes(),
+        htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
+      // Sync the edit to the WAL
+      log.sync(txid);
+      log.startCacheFlush(hri.getEncodedNameAsBytes(), 
htd.getColumnFamilyNames());
+      log.completeCacheFlush(hri.getEncodedNameAsBytes(), 
HConstants.NO_SEQNUM);
+      log.shutdown();
+
+      // Inject our failure, object is constructed via reflection.
+      BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
+
+      // Now open a reader on the log which will throw an exception when
+      // we try to instantiate the custom Codec.
+      Path filename = AbstractFSWALProvider.getCurrentFileName(log);
+      try {
+        reader = customFactory.createReader(proxyFs, filename);
+        fail("Expected to see an exception when creating WAL reader");
+      } catch (Exception e) {
+        // Expected that we get an exception
+      }
+      // We should have exactly one reader
+      assertEquals(1, openedReaders.size());
+      // And that reader should be closed.
+      long unclosedReaders = openedReaders.stream()
+          .filter((r) -> !r.isClosed.get())
+          .collect(Collectors.counting());
+      assertEquals("Should not find any open readers", 0, (int) 
unclosedReaders);
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+
+  /**
+   * A proxy around FSDataInputStream which can report if close() was called.
+   */
+  private static class InputStreamProxy extends FSDataInputStream {
+    private final InputStream real;
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+    public InputStreamProxy(InputStream real) {
+      super(real);
+      this.real = real;
+    }
+
+    @Override
+    public void close() throws IOException {
+      isClosed.set(true);
+      real.close();
+    }
+  }
+
+  /**
+   * A custom WALCellCodec in which we can inject failure.
+   */
+  @SuppressWarnings("unused")
+  private static class BrokenWALCellCodec extends WALCellCodec {
+    static final AtomicBoolean THROW_FAILURE_ON_INIT = new 
AtomicBoolean(false);
+
+    static void maybeInjectFailure() {
+      if (THROW_FAILURE_ON_INIT.get()) {
+        throw new RuntimeException("Injected instantiation exception");
+      }
+    }
+
+    public BrokenWALCellCodec() {
+      super();
+      maybeInjectFailure();
+    }
+
+    public BrokenWALCellCodec(Configuration conf, CompressionContext 
compression) {
+      super(conf, compression);
+      maybeInjectFailure();
+    }
+  }
 }

Reply via email to