This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 50f50d5bce5 HBASE-28268 Provide option to skip wal while using 
TableOutputFormat (#6472)
50f50d5bce5 is described below

commit 50f50d5bce54e4696ed712ed24333d1041c2aecd
Author: raghu55n02 <[email protected]>
AuthorDate: Thu Nov 14 18:20:04 2024 +0530

    HBASE-28268 Provide option to skip wal while using TableOutputFormat (#6472)
    
    Signed-off-by: Viraj Jasani <[email protected]>
---
 .../hadoop/hbase/mapreduce/TableOutputFormat.java  |  15 +++
 .../hbase/mapreduce/TestTableOutputFormat.java     | 130 +++++++++++++++++++++
 2 files changed, 145 insertions(+)

diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index a8ec67c9b23..109aeef4ce0 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.BufferedMutator;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -53,6 +54,15 @@ public class TableOutputFormat<KEY> extends 
OutputFormat<KEY, Mutation> implemen
   /** Job parameter that specifies the output table. */
   public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
 
+  /** Property value to use write-ahead logging */
+  public static final boolean WAL_ON = true;
+
+  /** Property value to disable write-ahead logging */
+  public static final boolean WAL_OFF = false;
+
+  /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
+  public static final String WAL_PROPERTY = 
"hbase.mapreduce.tableoutputformat.write.wal";
+
   /**
    * Prefix for configuration property overrides to apply in {@link 
#setConf(Configuration)}. For
    * keys matching this prefix, the prefix is stripped, and the value is set 
in the configuration
@@ -98,6 +108,7 @@ public class TableOutputFormat<KEY> extends 
OutputFormat<KEY, Mutation> implemen
 
     private Connection connection;
     private BufferedMutator mutator;
+    boolean useWriteAheadLogging;
 
     /**
      *
@@ -108,6 +119,7 @@ public class TableOutputFormat<KEY> extends 
OutputFormat<KEY, Mutation> implemen
       this.connection = ConnectionFactory.createConnection(conf);
       this.mutator = 
connection.getBufferedMutator(TableName.valueOf(tableName));
       LOG.info("Created table instance for " + tableName);
+      this.useWriteAheadLogging = conf.getBoolean(WAL_PROPERTY, WAL_ON);
     }
 
     /**
@@ -141,6 +153,9 @@ public class TableOutputFormat<KEY> extends 
OutputFormat<KEY, Mutation> implemen
       if (!(value instanceof Put) && !(value instanceof Delete)) {
         throw new IOException("Pass a Delete or a Put");
       }
+      if (!useWriteAheadLogging) {
+        value.setDurability(Durability.SKIP_WAL);
+      }
       mutator.mutate(value);
     }
   }
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
new file mode 100644
index 00000000000..801099819b7
--- /dev/null
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableOutputFormat.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import javax.validation.constraints.Null;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+/**
+ * Simple Tests to check whether the durability of the Mutation is changed or 
not, for
+ * {@link TableOutputFormat} if {@link TableOutputFormat#WAL_PROPERTY} is set 
to false.
+ */
+@Category(MediumTests.class)
+public class TestTableOutputFormat {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestTableOutputFormat.class);
+
+  private static final HBaseTestingUtility util = new HBaseTestingUtility();
+  private static final TableName TABLE_NAME = TableName.valueOf("TEST_TABLE");
+  private static final byte[] columnFamily = Bytes.toBytes("f");
+  private static Configuration conf;
+  private static RecordWriter<Null, Mutation> writer;
+  private static TaskAttemptContext context;
+  private static TableOutputFormat<Null> tableOutputFormat;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    util.startMiniCluster();
+    util.createTable(TABLE_NAME, columnFamily);
+
+    conf = new Configuration(util.getConfiguration());
+    context = Mockito.mock(TaskAttemptContext.class);
+    tableOutputFormat = new TableOutputFormat<>();
+    conf.set(TableOutputFormat.OUTPUT_TABLE, "TEST_TABLE");
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @After
+  public void close() throws IOException, InterruptedException {
+    if (writer != null && context != null) {
+      writer.close(context);
+    }
+  }
+
+  @Test
+  public void testTableOutputFormatWhenWalIsOFFForPut() throws IOException, 
InterruptedException {
+    // setting up the configuration for the TableOutputFormat, with writing to 
the WAL off.
+    conf.setBoolean(TableOutputFormat.WAL_PROPERTY, TableOutputFormat.WAL_OFF);
+    tableOutputFormat.setConf(conf);
+
+    writer = tableOutputFormat.getRecordWriter(context);
+
+    // creating mutation of the type put
+    Put put = new Put("row1".getBytes());
+    put.addColumn(columnFamily, Bytes.toBytes("aa"), Bytes.toBytes("value"));
+
+    // verifying whether durability of mutation is USE_DEFAULT or not, before 
commiting write.
+    Assert.assertEquals("Durability of the mutation should be USE_DEFAULT", 
Durability.USE_DEFAULT,
+      put.getDurability());
+
+    writer.write(null, put);
+
+    // verifying whether durability of mutation got changed to the SKIP_WAL or 
not.
+    Assert.assertEquals("Durability of the mutation should be SKIP_WAL", 
Durability.SKIP_WAL,
+      put.getDurability());
+  }
+
+  @Test
+  public void testTableOutputFormatWhenWalIsOFFForDelete()
+    throws IOException, InterruptedException {
+    // setting up the configuration for the TableOutputFormat, with writing to 
the WAL off.
+    conf.setBoolean(TableOutputFormat.WAL_PROPERTY, TableOutputFormat.WAL_OFF);
+    tableOutputFormat.setConf(conf);
+
+    writer = tableOutputFormat.getRecordWriter(context);
+
+    // creating mutation of the type delete
+    Delete delete = new Delete("row2".getBytes());
+    delete.addColumn(columnFamily, Bytes.toBytes("aa"));
+
+    // verifying whether durability of mutation is USE_DEFAULT or not, before 
commiting write.
+    Assert.assertEquals("Durability of the mutation should be USE_DEFAULT", 
Durability.USE_DEFAULT,
+      delete.getDurability());
+
+    writer.write(null, delete);
+
+    // verifying whether durability of mutation got changed from USE_DEFAULT 
to the SKIP_WAL or not.
+    Assert.assertEquals("Durability of the mutation should be SKIP_WAL", 
Durability.SKIP_WAL,
+      delete.getDurability());
+  }
+}

Reply via email to