This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new a1d59bf HBASE-25994 Active WAL tailing fails when WAL value
compression is enabled (#3377)
a1d59bf is described below
commit a1d59bf1a569aa14c87da444e95d166580307c79
Author: Andrew Purtell <[email protected]>
AuthorDate: Mon Jun 14 17:16:31 2021 -0700
HBASE-25994 Active WAL tailing fails when WAL value compression is enabled
(#3377)
Depending on which compression codec is used, a short read of the
compressed bytes can cause catastrophic errors that confuse the WAL reader.
This problem can manifest when the reader is actively tailing the WAL for
replication. To avoid these issues when WAL value compression is enabled,
BoundedDelegatingInputStream should assume enough bytes are available to
supply a reader up to its bound. This behavior is valid per the contract
of available(), which provides an _estimate_ of available bytes, and
equivalent to IOUtils.readFully but without requiring an intermediate
buffer.
Added TestReplicationCompressedWAL and TestReplicationValueCompressedWAL.
Without the WALCellCodec change TestReplicationValueCompressedWAL will
fail.
Signed-off-by: Bharath Vissapragada <[email protected]>
---
.../hbase/io/BoundedDelegatingInputStream.java | 14 ++-
.../regionserver/TestReplicationCompressedWAL.java | 108 +++++++++++++++++++++
.../TestReplicationValueCompressedWAL.java | 59 +++++++++++
3 files changed, 176 insertions(+), 5 deletions(-)
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
index e9a3b67..d7db6a3 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
@@ -95,17 +95,21 @@ public class BoundedDelegatingInputStream extends
DelegatingInputStream {
}
/**
- * Call the delegate's {@code available()} method.
- * @return the delegate's available bytes if the current position is less
than the limit,
- * or 0 otherwise
+ * @return the remaining bytes within the bound if the current position is
less than the
+ * limit, or 0 otherwise.
*/
@Override
public int available() throws IOException {
if (pos >= limit) {
return 0;
}
- int available = in.available();
- return (int) Math.min(available, limit - pos);
+ // Do not call the delegate's available() method. Data in a bounded input
stream is assumed
+ // available up to the limit and that is the contract we have with our
callers. Regardless
+ // of what we do here, read() and skip() will behave as expected when EOF
is encountered if
+ // the underlying stream is closed early or otherwise could not provide
enough bytes.
+ // Note: This class is used to supply buffers to compression codecs during
WAL tailing and
+ // successful decompression depends on this behavior.
+ return (int) (limit - pos);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java
new file mode 100644
index 0000000..62fc4a3
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationCompressedWAL.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(MediumTests.class)
+public class TestReplicationCompressedWAL extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationCompressedWAL.class);
+
+ static final Logger LOG =
LoggerFactory.getLogger(TestReplicationCompressedWAL.class);
+ static final int NUM_BATCHES = 20;
+ static final int NUM_ROWS_PER_BATCH = 100;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ TestReplicationBase.setUpBeforeClass();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TestReplicationBase.tearDownAfterClass();
+ }
+
+ @Test
+ public void testMultiplePuts() throws Exception {
+ runMultiplePutTest();
+ }
+
+ protected static void runMultiplePutTest() throws IOException,
InterruptedException {
+ for (int i = 0; i < NUM_BATCHES; i++) {
+ putBatch(i);
+ getBatch(i);
+ }
+ }
+
+ protected static void getBatch(int batch) throws IOException,
InterruptedException {
+ for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) {
+ byte[] row = getRowKey(batch, i);
+ Get get = new Get(row);
+ for (int j = 0; j < NB_RETRIES; j++) {
+ if (j == NB_RETRIES - 1) {
+ fail("Waited too much time for replication");
+ }
+ Result res = htable2.get(get);
+ if (res.isEmpty()) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(row, res.value());
+ break;
+ }
+ }
+ }
+ }
+
+ protected static byte[] getRowKey(int batch, int count) {
+ return Bytes.toBytes("row" + ((batch * NUM_ROWS_PER_BATCH) + count));
+ }
+
+ protected static void putBatch(int batch) throws IOException {
+ for (int i = 0; i < NUM_ROWS_PER_BATCH; i++) {
+ byte[] row = getRowKey(batch, i);
+ Put put = new Put(row);
+ put.addColumn(famName, row, row);
+ htable1.put(put);
+ }
+ }
+
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java
new file mode 100644
index 0000000..00bf7dc
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationValueCompressedWAL.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(MediumTests.class)
+public class TestReplicationValueCompressedWAL extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationValueCompressedWAL.class);
+
+ static final Logger LOG =
LoggerFactory.getLogger(TestReplicationValueCompressedWAL.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ CONF1.setBoolean(CompressionContext.ENABLE_WAL_VALUE_COMPRESSION, true);
+ TestReplicationBase.setUpBeforeClass();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TestReplicationBase.tearDownAfterClass();
+ }
+
+ @Test
+ public void testMultiplePuts() throws Exception {
+ TestReplicationCompressedWAL.runMultiplePutTest();
+ }
+
+}