This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new f40c745  HBASE-22887 Fix HFileOutputFormat2 writer roll (#554)
f40c745 is described below

commit f40c745499659cb6c06f2f5b98996f035beee046
Author: langdamao <[email protected]>
AuthorDate: Wed Oct 9 07:42:54 2019 +0800

    HBASE-22887 Fix HFileOutputFormat2 writer roll (#554)
    
    Signed-off-by: langdamao <[email protected]>
---
 .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 14 +++++--------
 .../hbase/mapreduce/TestHFileOutputFormat2.java    | 23 +++++++++++++++++++++-
 2 files changed, 27 insertions(+), 10 deletions(-)

diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 32531e6..ebdf9cd 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -247,9 +247,9 @@ public class HFileOutputFormat2
       // Map of families to writers and how much has been output on the writer.
       private final Map<byte[], WriterLength> writers =
               new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+      private final Map<byte[], byte[]> previousRows =
+              new TreeMap<>(Bytes.BYTES_COMPARATOR);
       private final long now = EnvironmentEdgeManager.currentTime();
-      private boolean rollRequested = false;
 
       @Override
       public void write(ImmutableBytesWritable row, V cell)
@@ -291,12 +291,9 @@ public class HFileOutputFormat2
           configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
         }
 
-        if (wl != null && wl.written + length >= maxsize) {
-          this.rollRequested = true;
-        }
-
         // This can only happen once a row is finished though
-        if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+        if (wl != null && wl.written + length >= maxsize
+                && Bytes.compareTo(this.previousRows.get(family), rowKey) != 
0) {
           rollWriters(wl);
         }
 
@@ -354,7 +351,7 @@ public class HFileOutputFormat2
         wl.written += length;
 
         // Copy the row so we know when a row transition.
-        this.previousRow = rowKey;
+        this.previousRows.put(family, rowKey);
       }
 
       private void rollWriters(WriterLength writerLength) throws IOException {
@@ -365,7 +362,6 @@ public class HFileOutputFormat2
             closeWriter(wl);
           }
         }
-        this.rollRequested = false;
       }
 
       private void closeWriter(WriterLength wl) throws IOException {
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 92600f8..d4c3802 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -424,7 +424,8 @@ public class TestHFileOutputFormat2  {
     // Set down this value or we OOME in eclipse.
     conf.setInt("mapreduce.task.io.sort.mb", 20);
     // Write a few files.
-    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
+    long hregionMaxFilesize = 10 * 1024;
+    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
 
     Job job = new Job(conf, "testWritingPEData");
     setupRandomGeneratorMapper(job, false);
@@ -451,6 +452,26 @@ public class TestHFileOutputFormat2  {
     assertTrue(job.waitForCompletion(false));
     FileStatus [] files = fs.listStatus(testDir);
     assertTrue(files.length > 0);
+
+    //check output file num and size.
+    for (byte[] family : FAMILIES) {
+      long kvCount= 0;
+      RemoteIterator<LocatedFileStatus> iterator =
+              fs.listFiles(testDir.suffix("/" + new String(family)), true);
+      while (iterator.hasNext()) {
+        LocatedFileStatus keyFileStatus = iterator.next();
+        HFile.Reader reader =
+                HFile.createReader(fs, keyFileStatus.getPath(), new 
CacheConfig(conf), true, conf);
+        HFileScanner scanner = reader.getScanner(false, false, false);
+
+        kvCount += reader.getEntries();
+        scanner.seekTo();
+        long perKVSize = scanner.getCell().getSerializedSize();
+        assertTrue("Data size of each file should not be too large.",
+                perKVSize * reader.getEntries() <= hregionMaxFilesize);
+      }
+      assertEquals("Should write expected data in output file.", ROWSPERSPLIT, 
kvCount);
+    }
   }
 
   /**

Reply via email to