This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new f40c745 HBASE-22887 Fix HFileOutputFormat2 writer roll (#554)
f40c745 is described below
commit f40c745499659cb6c06f2f5b98996f035beee046
Author: langdamao <[email protected]>
AuthorDate: Wed Oct 9 07:42:54 2019 +0800
HBASE-22887 Fix HFileOutputFormat2 writer roll (#554)
Signed-off-by: langdamao <[email protected]>
---
.../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 14 +++++--------
.../hbase/mapreduce/TestHFileOutputFormat2.java | 23 +++++++++++++++++++++-
2 files changed, 27 insertions(+), 10 deletions(-)
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 32531e6..ebdf9cd 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -247,9 +247,9 @@ public class HFileOutputFormat2
// Map of families to writers and how much has been output on the writer.
private final Map<byte[], WriterLength> writers =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
- private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
+ private final Map<byte[], byte[]> previousRows =
+ new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final long now = EnvironmentEdgeManager.currentTime();
- private boolean rollRequested = false;
@Override
public void write(ImmutableBytesWritable row, V cell)
@@ -291,12 +291,9 @@ public class HFileOutputFormat2
configureStoragePolicy(conf, fs, tableAndFamily, writerPath);
}
- if (wl != null && wl.written + length >= maxsize) {
- this.rollRequested = true;
- }
-
// This can only happen once a row is finished though
- if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
+ if (wl != null && wl.written + length >= maxsize
+ && Bytes.compareTo(this.previousRows.get(family), rowKey) !=
0) {
rollWriters(wl);
}
@@ -354,7 +351,7 @@ public class HFileOutputFormat2
wl.written += length;
// Copy the row so we know when a row transition.
- this.previousRow = rowKey;
+ this.previousRows.put(family, rowKey);
}
private void rollWriters(WriterLength writerLength) throws IOException {
@@ -365,7 +362,6 @@ public class HFileOutputFormat2
closeWriter(wl);
}
}
- this.rollRequested = false;
}
private void closeWriter(WriterLength wl) throws IOException {
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 92600f8..d4c3802 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -424,7 +424,8 @@ public class TestHFileOutputFormat2 {
// Set down this value or we OOME in eclipse.
conf.setInt("mapreduce.task.io.sort.mb", 20);
// Write a few files.
- conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
+ long hregionMaxFilesize = 10 * 1024;
+ conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
Job job = new Job(conf, "testWritingPEData");
setupRandomGeneratorMapper(job, false);
@@ -451,6 +452,26 @@ public class TestHFileOutputFormat2 {
assertTrue(job.waitForCompletion(false));
FileStatus [] files = fs.listStatus(testDir);
assertTrue(files.length > 0);
+
+ //check output file num and size.
+ for (byte[] family : FAMILIES) {
+ long kvCount= 0;
+ RemoteIterator<LocatedFileStatus> iterator =
+ fs.listFiles(testDir.suffix("/" + new String(family)), true);
+ while (iterator.hasNext()) {
+ LocatedFileStatus keyFileStatus = iterator.next();
+ HFile.Reader reader =
+ HFile.createReader(fs, keyFileStatus.getPath(), new
CacheConfig(conf), true, conf);
+ HFileScanner scanner = reader.getScanner(false, false, false);
+
+ kvCount += reader.getEntries();
+ scanner.seekTo();
+ long perKVSize = scanner.getCell().getSerializedSize();
+ assertTrue("Data size of each file should not be too large.",
+ perKVSize * reader.getEntries() <= hregionMaxFilesize);
+ }
+ assertEquals("Should write expected data in output file.", ROWSPERSPLIT,
kvCount);
+ }
}
/**