VladRodionov commented on a change in pull request #921: HBASE-22749:
Distributed MOB compactions
URL: https://github.com/apache/hbase/pull/921#discussion_r367166696
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
##########
@@ -183,105 +262,185 @@ protected boolean performCompaction(FileDetails fd,
InternalScanner scanner, Cel
boolean hasMore;
Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(),
store.getColumnFamilyName());
byte[] fileName = null;
- StoreFileWriter mobFileWriter = null, delFileWriter = null;
- long mobCells = 0, deleteMarkersCount = 0;
+ StoreFileWriter mobFileWriter = null;
+ /*
+ * mobCells are used only to decide if we need to commit or abort current
MOB output file.
+ */
+ long mobCells = 0;
long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
boolean finished = false;
+
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
throughputController.start(compactionName);
- KeyValueScanner kvs = (scanner instanceof KeyValueScanner)?
(KeyValueScanner)scanner : null;
- long shippedCallSizeLimit = (long) numofFilesToCompact *
this.store.getColumnFamilyDescriptor().getBlocksize();
+ KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ?
(KeyValueScanner) scanner : null;
+ long shippedCallSizeLimit =
+ (long) numofFilesToCompact *
this.store.getColumnFamilyDescriptor().getBlocksize();
+
+ Cell mobCell = null;
try {
- try {
- // If the mob file writer could not be created, directly write the
cell to the store file.
- mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs),
fd.maxKeyCount,
- compactionCompression, store.getRegionInfo().getStartKey(), true);
- fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
- } catch (IOException e) {
- LOG.warn("Failed to create mob writer, "
- + "we will continue the compaction by writing MOB cells
directly in store files", e);
- }
- if (major) {
- try {
- delFileWriter = mobStore.createDelFileWriterInTmp(new
Date(fd.latestPutTs),
- fd.maxKeyCount, compactionCompression,
store.getRegionInfo().getStartKey());
- } catch (IOException e) {
- LOG.warn(
- "Failed to create del writer, "
- + "we will continue the compaction by writing delete markers
directly in store files",
- e);
- }
- }
+
+ mobFileWriter = newMobWriter(fd);
+ fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
+
do {
hasMore = scanner.next(cells, scannerContext);
- if (LOG.isDebugEnabled()) {
- now = EnvironmentEdgeManager.currentTime();
- }
+ now = EnvironmentEdgeManager.currentTime();
for (Cell c : cells) {
- if (major && CellUtil.isDelete(c)) {
- if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) {
- // Directly write it to a store file
- writer.append(c);
+ if (compactMOBs) {
+ if (MobUtils.isMobReferenceCell(c)) {
+ String fName = MobUtils.getMobFileName(c);
+ Path pp = new Path(new Path(fs.getUri()), new Path(path, fName));
+
+ // Added to support migration
+ try {
+ mobCell = mobStore.resolve(c, true, false).getCell();
+ } catch (FileNotFoundException fnfe) {
+ if (discardMobMiss) {
+ LOG.debug("Missing MOB cell: file={} not found cell={}", pp,
c);
+ continue;
+ } else {
+ throw fnfe;
+ }
+ }
+
+ if (discardMobMiss && mobCell.getValueLength() == 0) {
+ LOG.error("Missing MOB cell value: file=" + pp + " cell=" +
mobCell);
+ continue;
+ } else if (mobCell.getValueLength() == 0) {
+ // TODO: what to do here? This is data corruption?
+ LOG.warn("Found 0 length MOB cell in a file={} cell={}", pp,
mobCell);
+ }
+
+ if (mobCell.getValueLength() > mobSizeThreshold) {
+ // put the mob data back to the MOB store file
+ PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
+ if (!ioOptimizedMode) {
+ mobFileWriter.append(mobCell);
+ mobCells++;
+ writer.append(
+ MobUtils.createMobRefCell(mobCell, fileName,
this.mobStore.getRefCellTags()));
+ } else {
+ // I/O optimized mode
+ // Check if MOB cell origin file size is
+ // greater than threshold
+ Long size = mobLengthMap.get().get(fName);
+ if (size == null) {
+ // FATAL error, abort compaction
+ String msg = String.format(
+ "Found unreferenced MOB file during compaction %s,
aborting.", fName);
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ // Can not be null
+ if (size < maxMobFileSize) {
+ // If MOB cell origin file is below threshold
+ // it is get compacted
+ mobFileWriter.append(mobCell);
+ // Update number of mobCells in a current mob writer
+ mobCells++;
+ writer.append(
+ MobUtils.createMobRefCell(mobCell, fileName,
this.mobStore.getRefCellTags()));
+ // Update total size of the output (we do not take into
account
+ // file compression yet)
+ long len = getLength(mobFileWriter);
+
+ if (len > maxMobFileSize) {
+ LOG.debug("Closing output MOB File, length={} file={}",
len,
+ Bytes.toString(fileName));
+ commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId,
mobCells, major);
+ mobFileWriter = newMobWriter(fd);
+ fileName =
Bytes.toBytes(mobFileWriter.getPath().getName());
+ mobCells = 0;
+ }
+ } else {
+ // We leave large MOB file as is (is not compacted),
+ // then we update set of MOB file references
+ // and append mob cell directly to the store's writer
+ mobRefSet.get().add(fName);
+ writer.append(mobCell);
+ }
+ }
+ } else {
+ // If MOB value is less than threshold, append it directly to
a store file
+ PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
+ writer.append(mobCell);
+ cellsCountCompactedFromMob++;
+ cellsSizeCompactedFromMob += mobCell.getValueLength();
+ }
} else {
- // Add a ref tag to this cell and write it to a store file.
- writer.append(MobUtils.createMobRefDeleteMarker(c));
- // Write the cell to a del file
- delFileWriter.append(c);
- deleteMarkersCount++;
+ // Not a MOB reference cell
+ int size = c.getValueLength();
+ if (size > mobSizeThreshold) {
+ // This MOB cell comes from a regular store file
+ // therefore we store it into original mob output
+ mobFileWriter.append(c);
+ writer
+ .append(MobUtils.createMobRefCell(c, fileName,
this.mobStore.getRefCellTags()));
+ mobCells++;
+ cellsCountCompactedToMob++;
+ cellsSizeCompactedToMob += c.getValueLength();
+ if (ioOptimizedMode) {
+ // Update total size of the output (we do not take into
account
+ // file compression yet)
+ long len = getLength(mobFileWriter);
+ if (len > maxMobFileSize) {
+ commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId,
mobCells, major);
+ mobFileWriter = newMobWriter(fd);
+ fileName =
Bytes.toBytes(mobFileWriter.getPath().getName());
+ mobCells = 0;
+ }
+ }
+ } else {
+ // Not a MOB cell, write it directly to a store file
+ writer.append(c);
+ }
}
- } else if (mobFileWriter == null || c.getTypeByte() !=
KeyValue.Type.Put.getCode()) {
- // If the mob file writer is null or the kv type is not put,
directly write the cell
+ } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) {
+ // Not a major compaction or major with MOB disabled
+ // If the kv type is not put, directly write the cell
// to the store file.
writer.append(c);
} else if (MobUtils.isMobReferenceCell(c)) {
+ // Not a major MOB compaction, Put MOB reference
if (MobUtils.hasValidMobRefCellValue(c)) {
- int size = MobUtils.getMobValueLength(c);
- if (size > mobSizeThreshold) {
- // If the value size is larger than the threshold, it's
regarded as a mob. Since
- // its value is already in the mob file, directly write this
cell to the store file
- writer.append(c);
- } else {
- // If the value is not larger than the threshold, it's not
regarded a mob. Retrieve
- // the mob cell from the mob file, and write it back to the
store file. Must
- // close the mob scanner once the life cycle finished.
- try (MobCell mobCell = mobStore.resolve(c, false)) {
- if (mobCell.getCell().getValueLength() != 0) {
- // put the mob data back to the store file
- PrivateCellUtil.setSequenceId(mobCell.getCell(),
c.getSequenceId());
- writer.append(mobCell.getCell());
- cellsCountCompactedFromMob++;
- cellsSizeCompactedFromMob +=
mobCell.getCell().getValueLength();
- } else {
- // If the value of a file is empty, there might be issues
when retrieving,
- // directly write the cell to the store file, and leave it
to be handled by the
- // next compaction.
- writer.append(c);
- }
- }
- }
+ // We do not check mobSizeThreshold during normal compaction,
+ // leaving it to a MOB compaction run
+ writer.append(c);
+ // Add MOB reference to a MOB reference set
+ mobRefSet.get().add(MobUtils.getMobFileName(c));
} else {
- LOG.warn("The value format of the KeyValue " + c
- + " is wrong, its length is less than " + Bytes.SIZEOF_INT);
+ // TODO ????
+ LOG.error("Corrupted MOB reference: " + c);
Review comment:
Yeah, changed code to throw IOE.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services