Repository: hbase Updated Branches: refs/heads/master 478bdc8ea -> 125f5619b
HBASE-11777 Find a way to set sequenceId on Cells on the server. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/125f5619 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/125f5619 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/125f5619 Branch: refs/heads/master Commit: 125f5619b9ba08ce5a44829ceab5177b53d943a9 Parents: 478bdc8 Author: anoopsjohn <[email protected]> Authored: Mon Sep 1 15:04:00 2014 +0530 Committer: anoopsjohn <[email protected]> Committed: Mon Sep 1 15:04:00 2014 +0530 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hbase/CellUtil.java | 15 ++++++ .../java/org/apache/hadoop/hbase/KeyValue.java | 11 +--- .../apache/hadoop/hbase/SettableSequenceId.java | 34 ++++++++++++ .../io/encoding/BufferedDataBlockEncoder.java | 23 ++++++--- .../hadoop/hbase/regionserver/HRegion.java | 54 ++++++++++---------- .../hadoop/hbase/regionserver/HStore.java | 8 +-- .../MultiVersionConsistencyControl.java | 6 +-- .../hadoop/hbase/regionserver/SequenceId.java | 31 +++++++++++ .../hbase/regionserver/SequenceNumber.java | 31 ----------- .../apache/hadoop/hbase/regionserver/Store.java | 12 ++--- .../hbase/regionserver/StoreFileScanner.java | 8 ++- .../regionserver/compactions/Compactor.java | 3 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 11 ++-- .../hbase/regionserver/wal/FSWALEntry.java | 18 ++++--- .../hadoop/hbase/regionserver/wal/HLog.java | 7 ++- .../hadoop/hbase/regionserver/wal/HLogKey.java | 6 +-- .../hadoop/hbase/regionserver/TestHRegion.java | 9 ++-- 17 files changed, 169 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 50e42c6..fd7d252 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -515,4 +515,19 @@ public final class CellUtil { && (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2, end1) < 0); } + + /** + * Sets the given seqId to the cell. + * @param cell + * @param seqId + * @throws IOException when the passed cell is not of type {@link SettableSequenceId} + */ + public static void setSequenceId(Cell cell, long seqId) throws IOException { + if (cell instanceof SettableSequenceId) { + ((SettableSequenceId) cell).setSequenceId(seqId); + } else { + throw new IOException(new UnsupportedOperationException("Cell is not of type " + + SettableSequenceId.class.getName())); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index b9aa2f3..e75147b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -78,7 +78,7 @@ import com.google.common.annotations.VisibleForTesting; * and actual tag bytes length. */ @InterfaceAudience.Private -public class KeyValue implements Cell, HeapSize, Cloneable { +public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId { private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>(); static final Log LOG = LogFactory.getLog(KeyValue.class); @@ -1388,15 +1388,6 @@ public class KeyValue implements Cell, HeapSize, Cloneable { } /** - * @return Column (family + qualifier) length - */ - private int getTotalColumnLength(int rlength, int foffset) { - int flength = getFamilyLength(foffset); - int qlength = getQualifierLength(rlength,flength); - return flength + qlength; - } - - /** * @return Timestamp offset */ public int getTimestampOffset() { http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableSequenceId.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableSequenceId.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableSequenceId.java new file mode 100644 index 0000000..1aaccbd --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/SettableSequenceId.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Using this Interface one can mark a Cell as Sequence stampable. <br> + * Note : Make sure to make Cell implementation of this type in server side. + */ [email protected](HBaseInterfaceAudience.COPROC) +public interface SettableSequenceId { + + /** + * Sets with the given seqId. + * @param seqId + */ + void setSequenceId(long seqId); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 5729870..4772358 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SettableSequenceId; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator; import org.apache.hadoop.hbase.KeyValue.Type; @@ -320,7 +321,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { * Note that the value byte[] part is still pointing to the currentBuffer and the * represented by the valueOffset and valueLength */ - protected static class ClonedSeekerState implements Cell { + // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId + // there. So this has to be an instance of SettableSequenceId. SeekerState need not be + // SettableSequenceId as we never return that to top layers. When we have to, we make + // ClonedSeekerState from it. + protected static class ClonedSeekerState implements Cell, SettableSequenceId { private byte[] keyOnlyBuffer; private ByteBuffer currentBuffer; private short rowLength; @@ -335,12 +340,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { private int tagsLength; private int tagsOffset; private byte[] cloneTagsBuffer; - private long memstoreTS; + private long seqId; private TagCompressionContext tagCompressionContext; protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength, int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength, - long timeStamp, byte typeByte, int valueLen, int valueOffset, long memStoreTS, + long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId, int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext, byte[] tagsBuffer) { this.currentBuffer = currentBuffer; @@ -355,7 +360,6 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { this.typeByte = typeByte; this.valueLength = valueLen; this.valueOffset = valueOffset; - this.memstoreTS = memStoreTS; this.tagsOffset = tagsOffset; this.tagsLength = tagsLength; System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength); @@ -363,6 +367,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { this.cloneTagsBuffer = new byte[tagsLength]; System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength); } + setSequenceId(seqId); } @Override @@ -421,13 +426,14 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } @Override + @Deprecated public long getMvccVersion() { - return memstoreTS; + return getSequenceId(); } @Override public long getSequenceId() { - return memstoreTS; + return seqId; } @Override @@ -498,6 +504,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { } return kv.toString(); } + + @Override + public void setSequenceId(long seqId) { + this.seqId = seqId; + } } protected abstract static class http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 57377ad..04ad9a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1994,7 +1994,7 @@ public class HRegion implements HeapSize { // , Writable{ */ private long getNextSequenceId(final HLog wal) throws IOException { HLogKey key = this.appendNoSyncNoAppend(wal, null); - return key.getSequenceNumber(); + return key.getSequenceId(); } ////////////////////////////////////////////////////////////////////////////// @@ -2501,7 +2501,7 @@ public class HRegion implements HeapSize { // , Writable{ List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length]; - List<KeyValue> memstoreCells = new ArrayList<KeyValue>(); + List<Cell> memstoreCells = new ArrayList<Cell>(); // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; @@ -3081,9 +3081,10 @@ public class HRegion implements HeapSize { // , Writable{ * @param output newly added KVs into memstore * @return the additional memory usage of the memstore caused by the * new entries. + * @throws IOException */ private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, - long mvccNum, List<KeyValue> memstoreCells) { + long mvccNum, List<Cell> memstoreCells) throws IOException { long size = 0; for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { @@ -3092,9 +3093,8 @@ public class HRegion implements HeapSize { // , Writable{ Store store = getStore(family); for (Cell cell: cells) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - kv.setSequenceId(mvccNum); - Pair<Long, Cell> ret = store.add(kv); + CellUtil.setSequenceId(cell, mvccNum); + Pair<Long, Cell> ret = store.add(cell); size += ret.getFirst(); memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); } @@ -3108,13 +3108,13 @@ public class HRegion implements HeapSize { // , Writable{ * called when a Put/Delete has updated memstore but subsequently fails to update * the wal. This method is then invoked to rollback the memstore. */ - private void rollbackMemstore(List<KeyValue> memstoreCells) { + private void rollbackMemstore(List<Cell> memstoreCells) { int kvsRolledback = 0; - for (KeyValue kv : memstoreCells) { - byte[] family = kv.getFamily(); + for (Cell cell : memstoreCells) { + byte[] family = cell.getFamily(); Store store = getStore(family); - store.rollback(kv); + store.rollback(cell); kvsRolledback++; } LOG.debug("rollbackMemstore rolled back " + kvsRolledback); @@ -5043,7 +5043,7 @@ public class HRegion implements HeapSize { // , Writable{ List<RowLock> acquiredRowLocks; long addedSize = 0; List<Mutation> mutations = new ArrayList<Mutation>(); - List<KeyValue> memstoreCells = new ArrayList<KeyValue>(); + List<Cell> memstoreCells = new ArrayList<Cell>(); Collection<byte[]> rowsToLock = processor.getRowsToLock(); long mvccNum = 0; HLogKey walKey = null; @@ -5075,16 +5075,16 @@ public class HRegion implements HeapSize { // , Writable{ // 7. Apply to memstore for (Mutation m : mutations) { for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current()); - kv.setSequenceId(mvccNum); - Store store = getStore(kv); + Cell cell = cellScanner.current(); + CellUtil.setSequenceId(cell, mvccNum); + Store store = getStore(cell); if (store == null) { - checkFamily(CellUtil.cloneFamily(kv)); + checkFamily(CellUtil.cloneFamily(cell)); // unreachable } - Pair<Long, Cell> ret = store.add(kv); + Pair<Long, Cell> ret = store.add(cell); addedSize += ret.getFirst(); - memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); + memstoreCells.add(ret.getSecond()); } } @@ -5242,7 +5242,7 @@ public class HRegion implements HeapSize { // , Writable{ WriteEntry w = null; HLogKey walKey = null; RowLock rowLock = null; - List<KeyValue> memstoreCells = new ArrayList<KeyValue>(); + List<Cell> memstoreCells = new ArrayList<Cell>(); boolean doRollBackMemstore = false; try { rowLock = getRowLock(row); @@ -5354,14 +5354,14 @@ public class HRegion implements HeapSize { // , Writable{ if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); - memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue())); + memstoreCells.addAll(entry.getValue()); } else { // otherwise keep older versions around for (Cell cell: entry.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); Pair<Long, Cell> ret = store.add(kv); size += ret.getFirst(); - memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); + memstoreCells.add(ret.getSecond()); doRollBackMemstore = true; } } @@ -5380,7 +5380,7 @@ public class HRegion implements HeapSize { // , Writable{ } else { recordMutationWithoutWal(append.getFamilyCellMap()); } - if(walKey == null){ + if (walKey == null) { // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); } @@ -5459,7 +5459,7 @@ public class HRegion implements HeapSize { // , Writable{ WriteEntry w = null; HLogKey walKey = null; long mvccNum = 0; - List<KeyValue> memstoreCells = new ArrayList<KeyValue>(); + List<Cell> memstoreCells = new ArrayList<Cell>(); boolean doRollBackMemstore = false; try { rowLock = getRowLock(row); @@ -5575,14 +5575,14 @@ public class HRegion implements HeapSize { // , Writable{ if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 size += store.upsert(entry.getValue(), getSmallestReadPoint()); - memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue())); + memstoreCells.addAll(entry.getValue()); } else { // otherwise keep older versions around for (Cell cell : entry.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); Pair<Long, Cell> ret = store.add(kv); size += ret.getFirst(); - memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); + memstoreCells.add(ret.getSecond()); doRollBackMemstore = true; } } @@ -6373,12 +6373,12 @@ public class HRegion implements HeapSize { // , Writable{ * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore * the WALEdit append later. * @param wal - * @param cells list of KeyValues inserted into memstore. Those KeyValues are passed in order to - * be updated with right mvcc values(their log sequence nu + * @param cells list of Cells inserted into memstore. Those Cells are passed in order to + * be updated with right mvcc values(their log sequence number) * @return Return the key used appending with no sync and no append. * @throws IOException */ - private HLogKey appendNoSyncNoAppend(final HLog wal, List<KeyValue> cells) throws IOException { + private HLogKey appendNoSyncNoAppend(final HLog wal, List<Cell> cells) throws IOException { HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE); // Call append but with an empty WALEdit. The returned seqeunce id will not be associated http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index cbda424..8d20d7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -631,10 +631,10 @@ public class HStore implements Store { } @Override - public Pair<Long, Cell> add(final KeyValue kv) { + public Pair<Long, Cell> add(final Cell cell) { lock.readLock().lock(); try { - return this.memstore.add(kv); + return this.memstore.add(cell); } finally { lock.readLock().unlock(); } @@ -661,10 +661,10 @@ public class HStore implements Store { } @Override - public void rollback(final KeyValue kv) { + public void rollback(final Cell cell) { lock.readLock().lock(); try { - this.memstore.rollback(kv); + this.memstore.rollback(cell); } finally { lock.readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java index 0b5f5d0..23df7cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConsistencyControl.java @@ -114,11 +114,11 @@ public class MultiVersionConsistencyControl { * visible to MVCC readers. * @throws IOException */ - public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceNumber seqNum) + public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId) throws IOException { if(e == null) return; - if (seqNum != null) { - e.setWriteNumber(seqNum.getSequenceNumber()); + if (seqId != null) { + e.setWriteNumber(seqId.getSequenceId()); } else { // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside // function beginMemstoreInsertWithSeqNum in case of failures http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceId.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceId.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceId.java new file mode 100644 index 0000000..6d0beda --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceId.java @@ -0,0 +1,31 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Interface which abstracts implementations on log sequenceId assignment + */ [email protected] +public interface SequenceId { + public long getSequenceId() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java deleted file mode 100644 index 90b1029..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceNumber.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.io.IOException; -import org.apache.hadoop.classification.InterfaceAudience; - -/** - * Interface which abstracts implementations on log sequence number assignment - */ [email protected] -public interface SequenceNumber { - public long getSequenceNumber() throws IOException; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index d782913..4f0c5fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -122,10 +122,10 @@ public interface Store extends HeapSize, StoreConfigInformation { /** * Adds a value to the memstore - * @param kv + * @param cell * @return memstore size delta & newly added KV which maybe different than the passed in KV */ - Pair<Long, Cell> add(KeyValue kv); + Pair<Long, Cell> add(Cell cell); /** * When was the last edit done in the memstore @@ -133,11 +133,11 @@ public interface Store extends HeapSize, StoreConfigInformation { long timeOfOldestEdit(); /** - * Removes a kv from the memstore. The KeyValue is removed only if its key & memstoreTS match the - * key & memstoreTS value of the kv parameter. - * @param kv + * Removes a Cell from the memstore. The Cell is removed only if its key & memstoreTS match the + * key & memstoreTS value of the cell parameter. + * @param cell */ - void rollback(final KeyValue kv); + void rollback(final Cell cell); /** * Find the key that matches <i>row</i> exactly, or the one that immediately precedes it. WARNING: http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 6474e96..aa351d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -198,12 +198,10 @@ public class StoreFileScanner implements KeyValueScanner { } } - protected void setCurrentCell(Cell newVal) { + protected void setCurrentCell(Cell newVal) throws IOException { this.cur = newVal; - if(this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) { - KeyValue curKV = KeyValueUtil.ensureKeyValue(cur); - curKV.setSequenceId(this.reader.getSequenceID()); - cur = curKV; + if (this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) { + CellUtil.setSequenceId(cur, this.reader.getSequenceID()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index fdc38c5..2b053a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; @@ -241,7 +242,7 @@ public abstract class Compactor { for (Cell c : kvs) { KeyValue kv = KeyValueUtil.ensureKeyValue(c); if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) { - kv.setSequenceId(0); + CellUtil.setSequenceId(kv, 0); } writer.append(kv); ++progress.currentCompactedKVs; http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 99c6254..69667bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -1115,9 +1116,9 @@ class FSHLog implements HLog, Syncable { @Override public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key, final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, - final List<KeyValue> memstoreKVs) + final List<Cell> memstoreCells) throws IOException { - return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreKVs); + return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreCells); } /** @@ -1132,7 +1133,7 @@ class FSHLog implements HLog, Syncable { * @param sync shall we sync after we call the append? * @param inMemstore * @param sequenceId The region sequence id reference. - * @param memstoreKVs + * @param memstoreCells * @return txid of this transaction or if nothing to do, the last txid * @throws IOException */ @@ -1140,7 +1141,7 @@ class FSHLog implements HLog, Syncable { justification="Will never be null") private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key, WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore, - List<KeyValue> memstoreKVs) + List<Cell> memstoreCells) throws IOException { if (!this.enabled) return this.highestUnsyncedSequence; if (this.closed) throw new IOException("Cannot append; log is closed"); @@ -1158,7 +1159,7 @@ class FSHLog implements HLog, Syncable { // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append. - entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreKVs); + entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells); truck.loadPayload(entry, scope.detach()); } finally { this.disruptor.getRingBuffer().publish(sequence); http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index a9c2055..5081ab6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -18,13 +18,14 @@ package org.apache.hadoop.hbase.regionserver.wal; +import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; /** * A WAL Entry for {@link FSHLog} implementation. Immutable. @@ -43,18 +44,18 @@ class FSWALEntry extends HLog.Entry { private final transient boolean inMemstore; private final transient HTableDescriptor htd; private final transient HRegionInfo hri; - private final transient List<KeyValue> memstoreKVs; + private final transient List<Cell> memstoreCells; FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit, final AtomicLong referenceToRegionSequenceId, final boolean inMemstore, - final HTableDescriptor htd, final HRegionInfo hri, List<KeyValue> memstoreKVs) { + final HTableDescriptor htd, final HRegionInfo hri, List<Cell> memstoreCells) { super(key, edit); this.regionSequenceIdReference = referenceToRegionSequenceId; this.inMemstore = inMemstore; this.htd = htd; this.hri = hri; this.sequence = sequence; - this.memstoreKVs = memstoreKVs; + this.memstoreCells = memstoreCells; } public String toString() { @@ -87,13 +88,14 @@ class FSWALEntry extends HLog.Entry { * WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this * method to be called. * @return The region edit/sequence id we set for this edit. + * @throws IOException * @see #getRegionSequenceId() */ - long stampRegionSequenceId() { + long stampRegionSequenceId() throws IOException { long regionSequenceId = this.regionSequenceIdReference.incrementAndGet(); - if (!this.getEdit().isReplay() && memstoreKVs != null && !memstoreKVs.isEmpty()) { - for(KeyValue kv : this.memstoreKVs){ - kv.setSequenceId(regionSequenceId); + if (!this.getEdit().isReplay() && memstoreCells != null && !memstoreCells.isEmpty()) { + for (Cell cell : this.memstoreCells) { + CellUtil.setSequenceId(cell, regionSequenceId); } } HLogKey key = getKey(); http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 8c72a17..1411455 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -34,10 +34,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.io.Writable; @@ -366,14 +366,13 @@ public interface HLog { * @param inMemstore Always true except for case where we are writing a compaction completion * record into the WAL; in this case the entry is just so we can finish an unfinished compaction * -- it is not an edit for memstore. - * @param memstoreKVs list of KVs added into memstore + * @param memstoreCells list of Cells added into memstore * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id * in it. * @throws IOException */ long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits, - AtomicLong sequenceId, boolean inMemstore, List<KeyValue> memstoreKVs) - throws IOException; + AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreCells) throws IOException; // TODO: Do we need all these versions of sync? void hsync() throws IOException; http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 60eda98..2d1efe9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; -import org.apache.hadoop.hbase.regionserver.SequenceNumber; +import org.apache.hadoop.hbase.regionserver.SequenceId; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.io.WritableComparable; @@ -69,7 +69,7 @@ import com.google.protobuf.ByteString; // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical // purposes. They need to be merged into HLogEntry. @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) -public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber { +public class HLogKey implements WritableComparable<HLogKey>, SequenceId { public static final Log LOG = LogFactory.getLog(HLogKey.class); // should be < 0 (@see #readFields(DataInput)) @@ -278,7 +278,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber { * @throws InterruptedException */ @Override - public long getSequenceNumber() throws IOException { + public long getSequenceId() throws IOException { try { this.seqNumAssignedLatch.await(); } catch (InterruptedException ie) { http://git-wip-us.apache.org/repos/asf/hbase/blob/125f5619/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 2d482b1..de7c5a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; @@ -970,7 +969,7 @@ public class TestHRegion { // throw exceptions if the WalEdit is a start flush action when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), (WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(), - (List<KeyValue>)any())) + (List<Cell>)any())) .thenThrow(new IOException("Fail to append flush marker")); // start cache flush will throw exception @@ -4499,7 +4498,7 @@ public class TestHRegion { //verify append called or not verify(log, expectAppend ? times(1) : never()) .appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), - (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<KeyValue>)any()); + (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<Cell>)any()); // verify sync called or not if (expectSync || expectSyncFromLogSyncer) { @@ -5524,7 +5523,7 @@ public class TestHRegion { TEST_UTIL.getConfiguration(), rss, null); verify(log, times(1)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any() - , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<KeyValue>)any()); + , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any()); WALEdit edit = editCaptor.getValue(); assertNotNull(edit); @@ -5588,7 +5587,7 @@ public class TestHRegion { // 2 times, one for region open, the other close region verify(log, times(2)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), - editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<KeyValue>)any()); + editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any()); WALEdit edit = editCaptor.getAllValues().get(1); assertNotNull(edit);
