This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 74eff79  HBASE-26273 Force ReadType.STREAM when the user does not 
explicitly set a ReadType on the Scan for a Snapshot-based Job
74eff79 is described below

commit 74eff79c274ccf40a275a2cf8d04bf8319f4fed5
Author: Josh Elser <els...@apache.org>
AuthorDate: Fri Sep 10 16:24:13 2021 -0400

    HBASE-26273 Force ReadType.STREAM when the user does not explicitly set a 
ReadType on the Scan for a Snapshot-based Job
    
    HBase 2 moved over Scans to use PREAD by default instead of STREAM like
    HBase 1. In the context of a MapReduce job, we can generally expect that
    clients using the InputFormat (batch job) would be reading most of the
    data for a job. Cater to them, but still give users who want PREAD the
    ability to do so.
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
---
 .../mapreduce/TableSnapshotInputFormatImpl.java    | 18 ++++++++++++
 .../mapreduce/TestTableSnapshotInputFormat.java    | 33 ++++++++++++++++++++++
 2 files changed, 51 insertions(+)

diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index c3f05f4..f83a9b9 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -129,6 +130,14 @@ public class TableSnapshotInputFormatImpl {
   public static final boolean 
SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true;
 
   /**
+   * The {@link ReadType} which should be set on the {@link Scan} to read the 
HBase Snapshot,
+   * default STREAM.
+   */
+  public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE =
+      "hbase.TableSnapshotInputFormat.scanner.readtype";
+  public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = 
ReadType.STREAM;
+
+  /**
    * Implementation class for InputSplit logic common between mapred and 
mapreduce.
    */
   public static class InputSplit implements Writable {
@@ -382,6 +391,15 @@ public class TableSnapshotInputFormatImpl {
     } else {
       throw new IllegalArgumentException("Unable to create scan");
     }
+
+    if (scan.getReadType() == ReadType.DEFAULT) {
+      LOG.info("Provided Scan has DEFAULT ReadType,"
+          + " updating STREAM for Snapshot-based InputFormat");
+      // Update the "DEFAULT" ReadType to be "STREAM" to try to improve the 
default case.
+      scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE,
+          SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT));
+    }
+
     return scan;
   }
 
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
index 0820f3b..b1a07f0 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -22,6 +22,8 @@ import static 
org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNA
 import static 
org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
 import static 
org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
 import static 
org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
+import static 
org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -407,6 +410,36 @@ public class TestTableSnapshotInputFormat extends 
TableSnapshotInputFormatTestBa
     }
   }
 
+  @Test
+  public void testScannerReadTypeConfiguration() throws IOException {
+    Configuration conf = new Configuration(false);
+    // Explicitly set ReadTypes should persist
+    for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) {
+      Scan scanWithReadType = new Scan();
+      scanWithReadType.setReadType(readType);
+      assertEquals(scanWithReadType.getReadType(),
+          serializeAndReturn(conf, scanWithReadType).getReadType());
+    }
+    // We should only see the DEFAULT ReadType getting updated to STREAM.
+    Scan scanWithoutReadType = new Scan();
+    assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
+    assertEquals(ReadType.STREAM, serializeAndReturn(conf, 
scanWithoutReadType).getReadType());
+
+    // We should still be able to force a certain ReadType when DEFAULT is 
given.
+    conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD);
+    assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
+    assertEquals(ReadType.PREAD, serializeAndReturn(conf, 
scanWithoutReadType).getReadType());
+  }
+
+  /**
+   * Serializes and deserializes the given scan in the same manner that
+   * TableSnapshotInputFormat does.
+   */
+  private Scan serializeAndReturn(Configuration conf, Scan s) throws 
IOException {
+    conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s));
+    return TableSnapshotInputFormatImpl.extractScanFromConf(conf);
+  }
+
   private void verifyWithMockedMapReduce(Job job, int numRegions, int 
expectedNumSplits,
       byte[] startRow, byte[] stopRow)
       throws IOException, InterruptedException {

Reply via email to