This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 50f50d5bce5 HBASE-28268 Provide option to skip wal while using
TableOutputFormat (#6472)
50f50d5bce5 is described below
commit 50f50d5bce54e4696ed712ed24333d1041c2aecd
Author: raghu55n02 <[email protected]>
AuthorDate: Thu Nov 14 18:20:04 2024 +0530
HBASE-28268 Provide option to skip wal while using TableOutputFormat (#6472)
Signed-off-by: Viraj Jasani <[email protected]>
---
.../hadoop/hbase/mapreduce/TableOutputFormat.java | 15 +++
.../hbase/mapreduce/TestTableOutputFormat.java | 130 +++++++++++++++++++++
2 files changed, 145 insertions(+)
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index a8ec67c9b23..109aeef4ce0 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.mapreduce.JobContext;
@@ -53,6 +54,15 @@ public class TableOutputFormat<KEY> extends
OutputFormat<KEY, Mutation> implemen
/** Job parameter that specifies the output table. */
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+ /** Property value to use write-ahead logging */
+ public static final boolean WAL_ON = true;
+
+ /** Property value to disable write-ahead logging */
+ public static final boolean WAL_OFF = false;
+
+ /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
+ public static final String WAL_PROPERTY =
"hbase.mapreduce.tableoutputformat.write.wal";
+
/**
* Prefix for configuration property overrides to apply in {@link
#setConf(Configuration)}. For
* keys matching this prefix, the prefix is stripped, and the value is set
in the configuration
@@ -98,6 +108,7 @@ public class TableOutputFormat<KEY> extends
OutputFormat<KEY, Mutation> implemen
private Connection connection;
private BufferedMutator mutator;
+ boolean useWriteAheadLogging;
/**
*
@@ -108,6 +119,7 @@ public class TableOutputFormat<KEY> extends
OutputFormat<KEY, Mutation> implemen
this.connection = ConnectionFactory.createConnection(conf);
this.mutator =
connection.getBufferedMutator(TableName.valueOf(tableName));
LOG.info("Created table instance for " + tableName);
+ this.useWriteAheadLogging = conf.getBoolean(WAL_PROPERTY, WAL_ON);
}
/**
@@ -141,6 +153,9 @@ public class TableOutputFormat<KEY> extends
OutputFormat<KEY, Mutation> implemen
if (!(value instanceof Put) && !(value instanceof Delete)) {
throw new IOException("Pass a Delete or a Put");
}
+ if (!useWriteAheadLogging) {
+ value.setDurability(Durability.SKIP_WAL);
+ }
mutator.mutate(value);
}
}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
new file mode 100644
index 00000000000..801099819b7
--- /dev/null
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import javax.validation.constraints.Null;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+/**
+ * Simple Tests to check whether the durability of the Mutation is changed or
not, for
+ * {@link TableOutputFormat} if {@link TableOutputFormat#WAL_PROPERTY} is set
to false.
+ */
+@Category(MediumTests.class)
+public class TestTableOutputFormat {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestTableOutputFormat.class);
+
+ private static final HBaseTestingUtility util = new HBaseTestingUtility();
+ private static final TableName TABLE_NAME = TableName.valueOf("TEST_TABLE");
+ private static final byte[] columnFamily = Bytes.toBytes("f");
+ private static Configuration conf;
+ private static RecordWriter<Null, Mutation> writer;
+ private static TaskAttemptContext context;
+ private static TableOutputFormat<Null> tableOutputFormat;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ util.startMiniCluster();
+ util.createTable(TABLE_NAME, columnFamily);
+
+ conf = new Configuration(util.getConfiguration());
+ context = Mockito.mock(TaskAttemptContext.class);
+ tableOutputFormat = new TableOutputFormat<>();
+ conf.set(TableOutputFormat.OUTPUT_TABLE, "TEST_TABLE");
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ @After
+ public void close() throws IOException, InterruptedException {
+ if (writer != null && context != null) {
+ writer.close(context);
+ }
+ }
+
+ @Test
+ public void testTableOutputFormatWhenWalIsOFFForPut() throws IOException,
InterruptedException {
+ // setting up the configuration for the TableOutputFormat, with writing to
the WAL off.
+ conf.setBoolean(TableOutputFormat.WAL_PROPERTY, TableOutputFormat.WAL_OFF);
+ tableOutputFormat.setConf(conf);
+
+ writer = tableOutputFormat.getRecordWriter(context);
+
+ // creating mutation of the type put
+ Put put = new Put("row1".getBytes());
+ put.addColumn(columnFamily, Bytes.toBytes("aa"), Bytes.toBytes("value"));
+
+ // verifying whether durability of mutation is USE_DEFAULT or not, before
commiting write.
+ Assert.assertEquals("Durability of the mutation should be USE_DEFAULT",
Durability.USE_DEFAULT,
+ put.getDurability());
+
+ writer.write(null, put);
+
+ // verifying whether durability of mutation got changed to the SKIP_WAL or
not.
+ Assert.assertEquals("Durability of the mutation should be SKIP_WAL",
Durability.SKIP_WAL,
+ put.getDurability());
+ }
+
+ @Test
+ public void testTableOutputFormatWhenWalIsOFFForDelete()
+ throws IOException, InterruptedException {
+ // setting up the configuration for the TableOutputFormat, with writing to
the WAL off.
+ conf.setBoolean(TableOutputFormat.WAL_PROPERTY, TableOutputFormat.WAL_OFF);
+ tableOutputFormat.setConf(conf);
+
+ writer = tableOutputFormat.getRecordWriter(context);
+
+ // creating mutation of the type delete
+ Delete delete = new Delete("row2".getBytes());
+ delete.addColumn(columnFamily, Bytes.toBytes("aa"));
+
+ // verifying whether durability of mutation is USE_DEFAULT or not, before
commiting write.
+ Assert.assertEquals("Durability of the mutation should be USE_DEFAULT",
Durability.USE_DEFAULT,
+ delete.getDurability());
+
+ writer.write(null, delete);
+
+ // verifying whether durability of mutation got changed from USE_DEFAULT
to the SKIP_WAL or not.
+ Assert.assertEquals("Durability of the mutation should be SKIP_WAL",
Durability.SKIP_WAL,
+ delete.getDurability());
+ }
+}