Repository: hbase
Updated Branches:
refs/heads/branch-1.0 9ff10759c -> fbbbf7e6d
HBASE-12782 ITBLL fails for me if generator does anything but 5M per maptask
Conflicts:
hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
Conflicts:
hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fbbbf7e6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fbbbf7e6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fbbbf7e6
Branch: refs/heads/branch-1.0
Commit: fbbbf7e6d809c5c83c042ec5741a3d9b2fd712c6
Parents: 9ff1075
Author: stack <[email protected]>
Authored: Fri Jan 30 19:12:17 2015 -0800
Committer: stack <[email protected]>
Committed: Fri Jan 30 19:34:04 2015 -0800
----------------------------------------------------------------------
.../hadoop/hbase/DistributedHBaseCluster.java | 4 +-
.../test/IntegrationTestBigLinkedList.java | 418 ++++++++++++++++---
.../hadoop/hbase/backup/HFileArchiver.java | 4 +-
.../hadoop/hbase/mapreduce/WALPlayer.java | 64 +--
.../hadoop/hbase/master/SplitLogManager.java | 2 +-
.../hbase/master/cleaner/CleanerChore.java | 4 +-
.../hadoop/hbase/regionserver/HRegion.java | 44 +-
.../hadoop/hbase/regionserver/wal/FSHLog.java | 1 +
.../org/apache/hadoop/hbase/util/FSUtils.java | 2 +-
.../hadoop/hbase/wal/WALPrettyPrinter.java | 27 +-
.../apache/hadoop/hbase/wal/WALSplitter.java | 41 +-
hbase-server/src/test/data/0000000000000016310 | Bin 0 -> 11776703 bytes
.../hadoop/hbase/HBaseTestingUtility.java | 4 +-
.../hbase/regionserver/TestRecoveredEdits.java | 177 ++++++++
14 files changed, 647 insertions(+), 145 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
----------------------------------------------------------------------
diff --git
a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
index 6bc4143..fc88329 100644
---
a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
+++
b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Comparator;
import java.util.List;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterManager.ServiceType;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
@@ -37,7 +38,6 @@ import
org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
-
import com.google.common.collect.Sets;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
----------------------------------------------------------------------
diff --git
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
index bd13800..e43881a 100644
---
a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
+++
b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java
@@ -21,12 +21,16 @@ package org.apache.hadoop.hbase.test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@@ -40,12 +44,18 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
@@ -59,10 +69,9 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -72,12 +81,15 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl;
+import org.apache.hadoop.hbase.mapreduce.WALPlayer;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
@@ -90,10 +102,15 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
@@ -384,6 +401,9 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
current[i] = new byte[key.getLength()];
System.arraycopy(key.getBytes(), 0, current[i], 0, key.getLength());
if (++i == current.length) {
+ LOG.info("Persisting current.length=" + current.length + ", count="
+ count + ", id=" +
+ Bytes.toStringBinary(id) + ", current=" +
Bytes.toStringBinary(current[0]) +
+ ", i=" + i);
persist(output, count, prev, current, id);
i = 0;
@@ -475,8 +495,7 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
"pre-splitting table into " + totalNumberOfRegions + " regions "
+
"(default regions per server: " + regionsPerServer + ")");
- byte[][] splits = new RegionSplitter.UniformSplit().split(
- totalNumberOfRegions);
+ byte[][] splits = new
RegionSplitter.UniformSplit().split(totalNumberOfRegions);
admin.createTable(htd, splits);
}
@@ -566,6 +585,159 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
}
/**
+ * Tool to search missing rows in WALs and hfiles.
+ * Pass in file or dir of keys to search for. Key file must have been
written by Verify step
+ * (we depend on the format it writes out. We'll read them in and then
search in hbase
+ * WALs and oldWALs dirs (Some of this is TODO).
+ */
+ static class Search extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(Search.class);
+ protected Job job;
+
+ private static void printUsage(final String error) {
+ if (error != null && error.length() > 0) System.out.println("ERROR: " +
error);
+ System.err.println("Usage: search <KEYS_DIR> [<MAPPERS_COUNT>]");
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 1 || args.length > 2) {
+ printUsage(null);
+ return 1;
+ }
+ Path inputDir = new Path(args[0]);
+ int numMappers = 1;
+ if (args.length > 1) {
+ numMappers = Integer.parseInt(args[1]);
+ }
+ return run(inputDir, numMappers);
+ }
+
+ /**
+ * WALPlayer override that searches for keys loaded in the setup.
+ */
+ public static class WALSearcher extends WALPlayer {
+ public WALSearcher(Configuration conf) {
+ super(conf);
+ }
+
+ /**
+ * The actual searcher mapper.
+ */
+ public static class WALMapperSearcher extends WALMapper {
+ private SortedSet<byte []> keysToFind;
+
+ @Override
+ public void setup(Mapper<WALKey, WALEdit, ImmutableBytesWritable,
Mutation>.Context context)
+ throws IOException {
+ super.setup(context);
+ try {
+ this.keysToFind = readKeysToSearch(context.getConfiguration());
+ LOG.info("Loaded keys to find: count=" + this.keysToFind.size());
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException(e.toString());
+ }
+ }
+
+ @Override
+ protected boolean filter(Context context, Cell cell) {
+ // TODO: Can I do a better compare than this copying out key?
+ byte [] row = new byte [cell.getRowLength()];
+ System.arraycopy(cell.getRowArray(), cell.getRowOffset(), row, 0,
cell.getRowLength());
+ boolean b = this.keysToFind.contains(row);
+ if (b) {
+ String keyStr = Bytes.toStringBinary(row);
+ LOG.info("Found cell=" + cell);
+ context.getCounter(FOUND_GROUP_KEY, keyStr).increment(1);
+ }
+ return b;
+ }
+ }
+
+ // Put in place the above WALMapperSearcher.
+ @Override
+ public Job createSubmittableJob(String[] args) throws IOException {
+ Job job = super.createSubmittableJob(args);
+ // Call my class instead.
+ job.setJarByClass(WALMapperSearcher.class);
+ job.setMapperClass(WALMapperSearcher.class);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ return job;
+ }
+ }
+
+ static final String FOUND_GROUP_KEY = "Found";
+ static final String SEARCHER_INPUTDIR_KEY = "searcher.keys.inputdir";
+
+ public int run(Path inputDir, int numMappers) throws Exception {
+ getConf().set(SEARCHER_INPUTDIR_KEY, inputDir.toString());
+ SortedSet<byte []> keys = readKeysToSearch(getConf());
+ if (keys.isEmpty()) throw new RuntimeException("No keys to find");
+ LOG.info("Count of keys to find: " + keys.size());
+ for(byte [] key: keys) LOG.info("Key: " + Bytes.toStringBinary(key));
+ Path hbaseDir = new Path(getConf().get(HConstants.HBASE_DIR));
+ // Now read all WALs. In two dirs. Presumes certain layout.
+ Path walsDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME);
+ Path oldWalsDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+ LOG.info("Running Search with keys inputDir=" + inputDir +",
numMappers=" + numMappers +
+ " against " + getConf().get(HConstants.HBASE_DIR));
+ int ret = ToolRunner.run(new WALSearcher(getConf()), new String []
{walsDir.toString(), ""});
+ if (ret != 0) return ret;
+ return ToolRunner.run(new WALSearcher(getConf()), new String []
{oldWalsDir.toString(), ""});
+ }
+
+ static SortedSet<byte []> readKeysToSearch(final Configuration conf)
+ throws IOException, InterruptedException {
+ Path keysInputDir = new Path(conf.get(SEARCHER_INPUTDIR_KEY));
+ FileSystem fs = FileSystem.get(conf);
+ SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+ if (!fs.exists(keysInputDir)) {
+ throw new FileNotFoundException(keysInputDir.toString());
+ }
+ if (!fs.isDirectory(keysInputDir)) {
+ throw new UnsupportedOperationException("TODO");
+ } else {
+ RemoteIterator<LocatedFileStatus> iterator =
fs.listFiles(keysInputDir, false);
+ while(iterator.hasNext()) {
+ LocatedFileStatus keyFileStatus = iterator.next();
+ // Skip "_SUCCESS" file.
+ if (keyFileStatus.getPath().getName().startsWith("_")) continue;
+ result.addAll(readFileToSearch(conf, fs, keyFileStatus));
+ }
+ }
+ return result;
+ }
+
+ private static SortedSet<byte []> readFileToSearch(final Configuration
conf,
+ final FileSystem fs, final LocatedFileStatus keyFileStatus)
+ throws IOException, InterruptedException {
+ SortedSet<byte []> result = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
+ // Return entries that are flagged Counts.UNDEFINED in the value. Return
the row. This is
+ // what is missing.
+ TaskAttemptContext context = new TaskAttemptContextImpl(conf, new
TaskAttemptID());
+ try (SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader rr
=
+ new
SequenceFileAsBinaryInputFormat.SequenceFileAsBinaryRecordReader()) {
+ InputSplit is =
+ new FileSplit(keyFileStatus.getPath(), 0, keyFileStatus.getLen(),
new String [] {});
+ rr.initialize(is, context);
+ while (rr.nextKeyValue()) {
+ rr.getCurrentKey();
+ BytesWritable bw = rr.getCurrentValue();
+ switch (Verify.VerifyReducer.whichType(bw.getBytes())) {
+ case UNDEFINED:
+ byte [] key = new byte [rr.getCurrentKey().getLength()];
+ System.arraycopy(rr.getCurrentKey().getBytes(), 0, key, 0,
+ rr.getCurrentKey().getLength());
+ result.add(key);
+ break;
+ }
+ }
+ }
+ return result;
+ }
+ }
+
+ /**
* A Map Reduce job that verifies that the linked lists generated by
* {@link Generator} do not have any holes.
*/
@@ -596,21 +768,84 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
}
}
+ /**
+ * Don't change the order of these enums. Their ordinals are used as type
flag when we emit
+ * problems found from the reducer.
+ */
public static enum Counts {
- UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES
+ UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT, EXTRAREFERENCES,
EXTRA_UNDEF_REFERENCES
}
- public static class VerifyReducer extends
Reducer<BytesWritable,BytesWritable,Text,Text> {
+ /**
+ * Per reducer, we output problem rows as byte arrasy so can be used as
input for
+ * subsequent investigative mapreduce jobs. Each emitted value is prefaced
by a one byte flag
+ * saying what sort of emission it is. Flag is the Count enum ordinal as a
short.
+ */
+ public static class VerifyReducer
+ extends Reducer<BytesWritable,BytesWritable,BytesWritable,BytesWritable> {
private ArrayList<byte[]> refs = new ArrayList<byte[]>();
+ private final BytesWritable UNREF =
+ new BytesWritable(addPrefixFlag(Counts.UNREFERENCED.ordinal(), new
byte [] {}));
private AtomicInteger rows = new AtomicInteger(0);
+ private Connection connection;
+
+ @Override
+ protected void setup(Reducer<BytesWritable, BytesWritable,
BytesWritable, BytesWritable>.Context context)
+ throws IOException, InterruptedException {
+ super.setup(context);
+ this.connection =
ConnectionFactory.createConnection(context.getConfiguration());
+ }
+
+ @Override
+ protected void cleanup(Reducer<BytesWritable, BytesWritable,
BytesWritable, BytesWritable>.Context context)
+ throws IOException, InterruptedException {
+ if (this.connection != null) this.connection.close();
+ super.cleanup(context);
+ }
+
+ /**
+ * @param ordinal
+ * @param r
+ * @return Return new byte array that has <code>ordinal</code> as prefix
on front taking up
+ * Bytes.SIZEOF_SHORT bytes followed by <code>r</code>
+ */
+ public static byte [] addPrefixFlag(final int ordinal, final byte [] r) {
+ byte [] prefix = Bytes.toBytes((short)ordinal);
+ if (prefix.length != Bytes.SIZEOF_SHORT) {
+ throw new RuntimeException("Unexpected size: " + prefix.length);
+ }
+ byte [] result = new byte [prefix.length + r.length];
+ System.arraycopy(prefix, 0, result, 0, prefix.length);
+ System.arraycopy(r, 0, result, prefix.length, r.length);
+ return result;
+ }
+
+ /**
+ * @param bs
+ * @return Type from the Counts enum of this row. Reads prefix added by
+ * {@link #addPrefixFlag(int, byte[])}
+ */
+ public static Counts whichType(final byte [] bs) {
+ int ordinal = Bytes.toShort(bs, 0, Bytes.SIZEOF_SHORT);
+ return Counts.values()[ordinal];
+ }
+
+ /**
+ * @param bw
+ * @return Row bytes minus the type flag.
+ */
+ public static byte [] getRowOnly(BytesWritable bw) {
+ byte [] bytes = new byte [bw.getLength() - Bytes.SIZEOF_SHORT];
+ System.arraycopy(bw.getBytes(), Bytes.SIZEOF_SHORT, bytes, 0,
bytes.length);
+ return bytes;
+ }
@Override
public void reduce(BytesWritable key, Iterable<BytesWritable> values,
Context context)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
int defCount = 0;
-
refs.clear();
for (BytesWritable type : values) {
if (type.getLength() == DEF.getLength()) {
@@ -623,48 +858,110 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
}
// TODO check for more than one def, should not happen
-
StringBuilder refsSb = null;
- String keyString = null;
+ String keyString = Bytes.toStringBinary(key.getBytes(), 0,
key.getLength());
if (defCount == 0 || refs.size() != 1) {
- refsSb = new StringBuilder();
- String comma = "";
- for (byte[] ref : refs) {
- refsSb.append(comma);
- comma = ",";
- refsSb.append(Bytes.toStringBinary(ref));
- }
- keyString = Bytes.toStringBinary(key.getBytes(), 0, key.getLength());
-
- LOG.error("Linked List error: Key = " + keyString + " References = "
+ refsSb.toString());
+ refsSb = dumpExtraInfoOnRefs(key, context, refs);
+ LOG.error("LinkedListError: key=" + keyString + ", reference(s)=" +
+ (refsSb != null? refsSb.toString(): ""));
}
if (defCount == 0 && refs.size() > 0) {
- // this is bad, found a node that is referenced but not defined. It
must have been
+ // This is bad, found a node that is referenced but not defined. It
must have been
// lost, emit some info about this node for debugging purposes.
- context.write(new Text(keyString), new Text(refsSb.toString()));
- context.getCounter(Counts.UNDEFINED).increment(1);
+ // Write out a line per reference. If more than one, flag it.;
+ for (int i = 0; i < refs.size(); i++) {
+ byte [] bs = refs.get(i);
+ int ordinal;
+ if (i <= 0) {
+ ordinal = Counts.UNDEFINED.ordinal();
+ context.write(key, new BytesWritable(addPrefixFlag(ordinal,
bs)));
+ context.getCounter(Counts.UNDEFINED).increment(1);
+ } else {
+ ordinal = Counts.EXTRA_UNDEF_REFERENCES.ordinal();
+ context.write(key, new BytesWritable(addPrefixFlag(ordinal,
bs)));
+ }
+ }
if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
+ // Print out missing row; doing get on reference gives info on
when the referencer
+ // was added which can help a little debugging. This info is only
available in mapper
+ // output -- the 'Linked List error Key...' log message above.
What we emit here is
+ // useless for debugging.
context.getCounter("undef", keyString).increment(1);
}
} else if (defCount > 0 && refs.size() == 0) {
// node is defined but not referenced
- context.write(new Text(keyString), new Text("none"));
+ context.write(key, UNREF);
context.getCounter(Counts.UNREFERENCED).increment(1);
if (rows.addAndGet(1) < MISSING_ROWS_TO_LOG) {
context.getCounter("unref", keyString).increment(1);
}
} else {
if (refs.size() > 1) {
- if (refsSb != null) {
- context.write(new Text(keyString), new Text(refsSb.toString()));
+ // Skip first reference.
+ for (int i = 1; i < refs.size(); i++) {
+ context.write(key,
+ new
BytesWritable(addPrefixFlag(Counts.EXTRAREFERENCES.ordinal(), refs.get(i))));
}
context.getCounter(Counts.EXTRAREFERENCES).increment(refs.size() -
1);
}
// node is defined and referenced
context.getCounter(Counts.REFERENCED).increment(1);
}
+ }
+ /**
+ * Dump out extra info around references if there are any. Helps
debugging.
+ * @return StringBuilder filled with references if any.
+ * @throws IOException
+ */
+ private StringBuilder dumpExtraInfoOnRefs(final BytesWritable key, final
Context context,
+ final List<byte []> refs)
+ throws IOException {
+ StringBuilder refsSb = null;
+ if (refs.isEmpty()) return refsSb;
+ refsSb = new StringBuilder();
+ String comma = "";
+ // If a row is a reference but has no define, print the content of the
row that has
+ // this row as a 'prev'; it will help debug. The missing row was
written just before
+ // the row we are dumping out here.
+ TableName tn = getTableName(context.getConfiguration());
+ try (Table t = this.connection.getTable(tn)) {
+ for (byte [] ref : refs) {
+ Result r = t.get(new Get(ref));
+ List<Cell> cells = r.listCells();
+ String ts = (cells != null && !cells.isEmpty())?
+ new java.util.Date(cells.get(0).getTimestamp()).toString(): "";
+ byte [] b = r.getValue(FAMILY_NAME, COLUMN_CLIENT);
+ String jobStr = (b != null && b.length > 0)? Bytes.toString(b): "";
+ b = r.getValue(FAMILY_NAME, COLUMN_COUNT);
+ long count = (b != null && b.length > 0)? Bytes.toLong(b): -1;
+ b = r.getValue(FAMILY_NAME, COLUMN_PREV);
+ String refRegionLocation = "";
+ String keyRegionLocation = "";
+ if (b != null && b.length > 0) {
+ try (RegionLocator rl = this.connection.getRegionLocator(tn)) {
+ HRegionLocation hrl = rl.getRegionLocation(b);
+ if (hrl != null) refRegionLocation = hrl.toString();
+ // Key here probably has trailing zeros on it.
+ hrl = rl.getRegionLocation(key.getBytes());
+ if (hrl != null) keyRegionLocation = hrl.toString();
+ }
+ }
+ LOG.error("Extras on ref without a def, ref=" +
Bytes.toStringBinary(ref) +
+ ", refPrevEqualsKey=" +
+ (Bytes.compareTo(key.getBytes(), 0, key.getLength(), b, 0,
b.length) == 0) +
+ ", key=" + Bytes.toStringBinary(key.getBytes(), 0,
key.getLength()) +
+ ", ref row date=" + ts + ", jobStr=" + jobStr +
+ ", ref row count=" + count +
+ ", ref row regionLocation=" + refRegionLocation +
+ ", key row regionLocation=" + keyRegionLocation);
+ refsSb.append(comma);
+ comma = ",";
+ refsSb.append(Bytes.toStringBinary(ref));
+ }
+ }
+ return refsSb;
}
}
@@ -709,7 +1006,9 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
job.setReducerClass(VerifyReducer.class);
- job.setOutputFormatClass(TextOutputFormat.class);
+ job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(BytesWritable.class);
TextOutputFormat.setOutputPath(job, outputDir);
boolean success = job.waitForCompletion(true);
@@ -758,23 +1057,26 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
protected void handleFailure(Counters counters) throws IOException {
Configuration conf = job.getConfiguration();
- HConnection conn = HConnectionManager.getConnection(conf);
TableName tableName = getTableName(conf);
- CounterGroup g = counters.getGroup("undef");
- Iterator<Counter> it = g.iterator();
- while (it.hasNext()) {
- String keyString = it.next().getName();
- byte[] key = Bytes.toBytes(keyString);
- HRegionLocation loc = conn.relocateRegion(tableName, key);
- LOG.error("undefined row " + keyString + ", " + loc);
- }
- g = counters.getGroup("unref");
- it = g.iterator();
- while (it.hasNext()) {
- String keyString = it.next().getName();
- byte[] key = Bytes.toBytes(keyString);
- HRegionLocation loc = conn.relocateRegion(tableName, key);
- LOG.error("unreferred row " + keyString + ", " + loc);
+ try (Connection conn = ConnectionFactory.createConnection(conf)) {
+ try (RegionLocator rl = conn.getRegionLocator(tableName)) {
+ CounterGroup g = counters.getGroup("undef");
+ Iterator<Counter> it = g.iterator();
+ while (it.hasNext()) {
+ String keyString = it.next().getName();
+ byte[] key = Bytes.toBytes(keyString);
+ HRegionLocation loc = rl.getRegionLocation(key, true);
+ LOG.error("undefined row " + keyString + ", " + loc);
+ }
+ g = counters.getGroup("unref");
+ it = g.iterator();
+ while (it.hasNext()) {
+ String keyString = it.next().getName();
+ byte[] key = Bytes.toBytes(keyString);
+ HRegionLocation loc = rl.getRegionLocation(key, true);
+ LOG.error("unreferred row " + keyString + ", " + loc);
+ }
+ }
}
}
}
@@ -943,7 +1245,8 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
}
/**
- * A stand alone program that follows a linked list created by {@link
Generator} and prints timing info.
+ * A stand alone program that follows a linked list created by {@link
Generator} and prints
+ * timing info.
*/
private static class Walker extends Configured implements Tool {
@Override
@@ -1045,7 +1348,6 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
}
private static class Clean extends Configured implements Tool {
-
@Override public int run(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: Clean <output dir>");
@@ -1133,16 +1435,17 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
private void printCommands() {
System.err.println("Commands:");
- System.err.println(" Generator Map only job that generates data.");
- System.err.println(" Verify A map reduce job that looks for holes.
Look at the counts ");
+ System.err.println(" generator Map only job that generates data.");
+ System.err.println(" verify A map reduce job that looks for holes.
Look at the counts ");
System.err.println(" after running. See REFERENCED and
UNREFERENCED are ok. Any ");
System.err.println(" UNDEFINED counts are bad. Do not run with
the Generator.");
- System.err.println(" Walker " +
- "Standalong program that starts following a linked list & emits timing
info.");
- System.err.println(" Print Standalone program that prints nodes in
the linked list.");
- System.err.println(" Delete Standalone program that deletes a·single
node.");
- System.err.println(" Loop Program to Loop through Generator and
Verify steps");
- System.err.println(" Clean Program to clean all left over detritus.");
+ System.err.println(" walker " +
+ "Standalone program that starts following a linked list & emits timing
info.");
+ System.err.println(" print Standalone program that prints nodes in
the linked list.");
+ System.err.println(" delete Standalone program that deletes a·single
node.");
+ System.err.println(" loop Program to Loop through Generator and
Verify steps");
+ System.err.println(" clean Program to clean all left over detritus.");
+ System.err.println(" search Search for missing keys.");
System.err.flush();
}
@@ -1155,6 +1458,7 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
printUsage(this.getClass().getSimpleName() +
" <general options> COMMAND [<COMMAND options>]", "General options:",
"");
printCommands();
+ // Have to throw an exception here to stop the processing. Looks ugly
but gets message across.
throw new RuntimeException("Incorrect Number of args.");
}
toRun = args[0];
@@ -1165,7 +1469,7 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
public int runTestFromCommandLine() throws Exception {
Tool tool = null;
- if (toRun.equals("Generator")) {
+ if (toRun.equalsIgnoreCase("Generator")) {
tool = new Generator();
} else if (toRun.equalsIgnoreCase("Verify")) {
tool = new Verify();
@@ -1181,6 +1485,8 @@ public class IntegrationTestBigLinkedList extends
IntegrationTestBase {
tool = new Delete();
} else if (toRun.equalsIgnoreCase("Clean")) {
tool = new Clean();
+ } else if (toRun.equalsIgnoreCase("Search")) {
+ tool = new Search();
} else {
usage();
throw new RuntimeException("Unknown arg");
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
index a04cb88..d682ccc 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
@@ -221,9 +221,9 @@ public class HFileArchiver {
}
// otherwise we attempt to archive the store files
- if (LOG.isTraceEnabled()) LOG.trace("Archiving compacted store files.");
+ if (LOG.isDebugEnabled()) LOG.debug("Archiving compacted store files.");
- // wrap the storefile into a File
+ // Wrap the storefile into a File
StoreToFile getStorePath = new StoreToFile(fs);
Collection<File> storeFiles = Collections2.transform(compactedFiles,
getStorePath);
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index a487878..fe83afb 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -123,14 +123,12 @@ public class WALPlayer extends Configured implements Tool
{
* A mapper that writes out {@link Mutation} to be directly applied to
* a running HBase instance.
*/
- static class WALMapper
+ protected static class WALMapper
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
- private Map<TableName, TableName> tables =
- new TreeMap<TableName, TableName>();
+ private Map<TableName, TableName> tables = new TreeMap<TableName,
TableName>();
@Override
- public void map(WALKey key, WALEdit value,
- Context context)
+ public void map(WALKey key, WALEdit value, Context context)
throws IOException {
try {
if (tables.isEmpty() || tables.containsKey(key.getTablename())) {
@@ -145,27 +143,29 @@ public class WALPlayer extends Configured implements Tool
{
// filtering WAL meta entries
if (WALEdit.isMetaEditFamily(cell.getFamily())) continue;
- // A WALEdit may contain multiple operations (HBASE-3584) and/or
- // multiple rows (HBASE-5229).
- // Aggregate as much as possible into a single Put/Delete
- // operation before writing to the context.
- if (lastCell == null || lastCell.getTypeByte() !=
cell.getTypeByte()
- || !CellUtil.matchingRow(lastCell, cell)) {
- // row or type changed, write out aggregate KVs.
- if (put != null) context.write(tableOut, put);
- if (del != null) context.write(tableOut, del);
-
+ // Allow a subclass filter out this cell.
+ if (filter(context, cell)) {
+ // A WALEdit may contain multiple operations (HBASE-3584) and/or
+ // multiple rows (HBASE-5229).
+ // Aggregate as much as possible into a single Put/Delete
+ // operation before writing to the context.
+ if (lastCell == null || lastCell.getTypeByte() !=
cell.getTypeByte()
+ || !CellUtil.matchingRow(lastCell, cell)) {
+ // row or type changed, write out aggregate KVs.
+ if (put != null) context.write(tableOut, put);
+ if (del != null) context.write(tableOut, del);
+ if (CellUtil.isDelete(cell)) {
+ del = new Delete(cell.getRow());
+ } else {
+ put = new Put(cell.getRow());
+ }
+ }
if (CellUtil.isDelete(cell)) {
- del = new Delete(cell.getRow());
+ del.addDeleteMarker(cell);
} else {
- put = new Put(cell.getRow());
+ put.add(cell);
}
}
- if (CellUtil.isDelete(cell)) {
- del.addDeleteMarker(cell);
- } else {
- put.add(cell);
- }
lastCell = cell;
}
// write residual KVs
@@ -177,18 +177,30 @@ public class WALPlayer extends Configured implements Tool
{
}
}
+ /**
+ * @param cell
+ * @return Return true if we are to emit this cell.
+ */
+ protected boolean filter(Context context, final Cell cell) {
+ return true;
+ }
+
@Override
public void setup(Context context) throws IOException {
String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
- if (tablesToUse == null || tableMap == null || tablesToUse.length !=
tableMap.length) {
+ if (tablesToUse == null && tableMap == null) {
+ // Then user wants all tables.
+ } else if (tablesToUse == null || tableMap == null || tablesToUse.length
!= tableMap.length) {
// this can only happen when WALMapper is used directly by a class
other than WALPlayer
throw new IOException("No tables or incorrect table mapping
specified.");
}
int i = 0;
- for (String table : tablesToUse) {
- tables.put(TableName.valueOf(table),
+ if (tablesToUse != null) {
+ for (String table : tablesToUse) {
+ tables.put(TableName.valueOf(table),
TableName.valueOf(tableMap[i++]));
+ }
}
}
}
@@ -326,4 +338,4 @@ public class WALPlayer extends Configured implements Tool {
Job job = createSubmittableJob(otherArgs);
return job.waitForCompletion(true) ? 0 : 1;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
index 23ef6a5..221b173 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
@@ -164,7 +164,7 @@ public class SplitLogManager {
/**
* Get a list of paths that need to be split given a set of server-specific
directories and
- * optinally a filter.
+ * optionally a filter.
*
* See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more
info on directory
* layout.
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
index 6e2f4fd..24a0f9b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java
@@ -249,8 +249,8 @@ public abstract class CleanerChore<T extends
FileCleanerDelegate> extends Chore
int deletedFileCount = 0;
for (FileStatus file : filesToDelete) {
Path filePath = file.getPath();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Removing: " + filePath + " from archive");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing: " + filePath + " from archive");
}
try {
boolean success = this.fs.delete(filePath, false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 43e451a..53a6081 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -3525,11 +3526,24 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
internalFlushcache(null, seqid, status);
}
// Now delete the content of recovered edits. We're done w/ them.
- for (Path file: files) {
- if (!fs.delete(file, false)) {
- LOG.error("Failed delete of " + file);
- } else {
- LOG.debug("Deleted recovered.edits file=" + file);
+ if (files.size() > 0 &&
this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
+ // For debugging data loss issues!
+ // If this flag is set, make use of the hfile archiving by making
recovered.edits a fake
+ // column family. Have to fake out file type too by casting our
recovered.edits as storefiles
+ String fakeFamilyName =
WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName();
+ Set<StoreFile> fakeStoreFiles = new HashSet<StoreFile>(files.size());
+ for (Path file: files) {
+ fakeStoreFiles.add(new
StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf,
+ null, null));
+ }
+ getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles);
+ } else {
+ for (Path file: files) {
+ if (!fs.delete(file, false)) {
+ LOG.error("Failed delete of " + file);
+ } else {
+ LOG.debug("Deleted recovered.edits file=" + file);
+ }
}
}
return seqid;
@@ -3569,8 +3583,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
try {
// How many edits seen before we check elapsed time
- int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
- 2000);
+ int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
2000);
// How often to send a progress report (default 1/2 master timeout)
int period = this.conf.getInt("hbase.hstore.report.period", 300000);
long lastReport = EnvironmentEdgeManager.currentTime();
@@ -3629,21 +3642,24 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
continue;
}
}
+ // Check this edit is for this region.
+ if (!Bytes.equals(key.getEncodedRegionName(),
+ this.getRegionInfo().getEncodedNameAsBytes())) {
+ skippedEdits++;
+ continue;
+ }
boolean flush = false;
for (Cell cell: val.getCells()) {
// Check this edit is for me. Also, guard against writing the
special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
- if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY) ||
- !Bytes.equals(key.getEncodedRegionName(),
- this.getRegionInfo().getEncodedNameAsBytes())) {
+ if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
//this is a special edit, we should handle it
CompactionDescriptor compaction = WALEdit.getCompaction(cell);
if (compaction != null) {
//replay the compaction
completeCompactionMarker(compaction);
}
-
skippedEdits++;
continue;
}
@@ -3669,10 +3685,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
// Once we are over the limit, restoreEdit will keep returning
true to
// flush -- but don't flush until we've played all the kvs that
make up
// the WALEdit.
- if (!flush) {
- flush = restoreEdit(store, cell);
- }
-
+ flush |= restoreEdit(store, cell);
editsCount++;
}
if (flush) {
@@ -5002,6 +5015,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver { //
* @return qualified path of region directory
*/
@Deprecated
+ @VisibleForTesting
public static Path getRegionDir(final Path rootdir, final HRegionInfo info) {
return new Path(
FSUtils.getTableDir(rootdir, info.getTable()), info.getEncodedName());
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 2ca0a39..195f9b7 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -988,6 +988,7 @@ public class FSHLog implements WAL {
i.preLogArchive(p, newPath);
}
}
+ LOG.info("Archiving " + p + " to " + newPath);
if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
throw new IOException("Unable to rename " + p + " to " + newPath);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index ef1a0ce..9a6576a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -1162,7 +1162,7 @@ public abstract class FSUtils {
private List<String> blacklist;
/**
- * Create a filter on the give filesystem with the specified blacklist
+ * Create a filter on the givem filesystem with the specified blacklist
* @param fs filesystem to filter
* @param directoryNameBlackList list of the names of the directories to
filter. If
* <tt>null</tt>, all directories are returned
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
index 104faad..720cedc 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALPrettyPrinter.java
@@ -75,7 +75,7 @@ public class WALPrettyPrinter {
// enable in order to output a single list of transactions from several files
private boolean persistentOutput;
private boolean firstTxn;
- // useful for programatic capture of JSON output
+ // useful for programmatic capture of JSON output
private PrintStream out;
// for JSON encoding
private static final ObjectMapper MAPPER = new ObjectMapper();
@@ -267,8 +267,9 @@ public class WALPrettyPrinter {
Map<String, Object> op = new HashMap<String,
Object>(toStringMap(cell));
if (outputValues) op.put("value",
Bytes.toStringBinary(cell.getValue()));
// check row output filter
- if (row == null || ((String) op.get("row")).equals(row))
+ if (row == null || ((String) op.get("row")).equals(row)) {
actions.add(op);
+ }
}
if (actions.size() == 0)
continue;
@@ -283,22 +284,16 @@ public class WALPrettyPrinter {
out.print(MAPPER.writeValueAsString(txn));
} else {
// Pretty output, complete with indentation by atomic action
- out.println("Sequence " + txn.get("sequence") + " "
- + "from region " + txn.get("region") + " " + "in table "
- + txn.get("table") + " at write timestamp: " + new
Date(writeTime));
+ out.println("Sequence=" + txn.get("sequence") + " "
+ + ", region=" + txn.get("region") + " at write timestamp=" + new
Date(writeTime));
for (int i = 0; i < actions.size(); i++) {
Map op = actions.get(i);
- out.println(" Action:");
- out.println(" row: " + op.get("row"));
- out.println(" column: " + op.get("family") + ":"
- + op.get("qualifier"));
- out.println(" timestamp: "
- + (new Date((Long) op.get("timestamp"))));
- if(op.get("tag") != null) {
+ out.println("row=" + op.get("row") +
+ ", column=" + op.get("family") + ":" + op.get("qualifier"));
+ if (op.get("tag") != null) {
out.println(" tag: " + op.get("tag"));
}
- if (outputValues)
- out.println(" value: " + op.get("value"));
+ if (outputValues) out.println(" value: " + op.get("value"));
}
}
}
@@ -347,8 +342,6 @@ public class WALPrettyPrinter {
* Command line arguments
* @throws IOException
* Thrown upon file system errors etc.
- * @throws ParseException
- * Thrown if command-line parsing fails.
*/
public static void run(String[] args) throws IOException {
// create options
@@ -364,7 +357,7 @@ public class WALPrettyPrinter {
WALPrettyPrinter printer = new WALPrettyPrinter();
CommandLineParser parser = new PosixParser();
- List files = null;
+ List<?> files = null;
try {
CommandLine cmd = parser.parse(options, args);
files = cmd.getArgList();
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index d7d4a61..1744adf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -49,13 +49,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
@@ -79,6 +74,7 @@ import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagRewriteCell;
import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -108,9 +104,12 @@ import
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.R
import
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WAL.Reader;
-import org.apache.hadoop.hbase.wal.WALProvider.Writer;
+// imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -118,6 +117,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WAL.Reader;
+import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.io.MultipleIOException;
@@ -125,13 +127,10 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
-// imports for things that haven't moved from regionserver.wal yet.
-import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
-import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ServiceException;
/**
* This class is responsible for splitting up a bunch of regionserver commit
log
@@ -280,8 +279,7 @@ public class WALSplitter {
* log splitting implementation, splits one log file.
* @param logfile should be an actual log file.
*/
- boolean splitLogFile(FileStatus logfile,
- CancelableProgressable reporter) throws IOException {
+ boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter)
throws IOException {
Preconditions.checkState(status == null);
Preconditions.checkArgument(logfile.isFile(),
"passed in file status is for something other than a regular file.");
@@ -409,8 +407,9 @@ public class WALSplitter {
} finally {
String msg =
"Processed " + editsCount + " edits across " +
outputSink.getNumberOfRecoveredRegions()
- + " regions; log file=" + logPath + " is corrupted = " +
isCorrupted
- + " progress failed = " + progress_failed;
+ + " regions; edits skipped=" + editsSkipped + "; log file=" +
logPath +
+ ", length=" + logfile.getLen() + // See if length got updated
post lease recovery
+ ", corrupted=" + isCorrupted + ", progress failed=" +
progress_failed;
LOG.info(msg);
status.markComplete(msg);
}
@@ -724,8 +723,8 @@ public class WALSplitter {
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Written region seqId to file:" + newSeqIdFile + "
,newSeqId=" + newSeqId
- + " ,maxSeqId=" + maxSeqId);
+ LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file,
newSeqId=" + newSeqId
+ + ", maxSeqId=" + maxSeqId);
}
} catch (FileAlreadyExistsException ignored) {
// latest hdfs throws this exception. it's all right if newSeqIdFile
already exists
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/test/data/0000000000000016310
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/data/0000000000000016310
b/hbase-server/src/test/data/0000000000000016310
new file mode 100644
index 0000000..8e58c98
Binary files /dev/null and b/hbase-server/src/test/data/0000000000000016310
differ
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index ee67629..27a7d4b 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -2399,14 +2399,14 @@ public class HBaseTestingUtility extends
HBaseCommonTestingUtility {
* Stops the previously started <code>MiniMRCluster</code>.
*/
public void shutdownMiniMapReduceCluster() {
- LOG.info("Stopping mini mapreduce cluster...");
if (mrCluster != null) {
+ LOG.info("Stopping mini mapreduce cluster...");
mrCluster.shutdown();
mrCluster = null;
+ LOG.info("Mini mapreduce cluster stopped");
}
// Restore configuration to point to local jobtracker
conf.set("mapreduce.jobtracker.address", "local");
- LOG.info("Mini mapreduce cluster stopped");
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/fbbbf7e6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
new file mode 100644
index 0000000..3d651ef
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mortbay.log.Log;
+
+/**
+ * Tests around replay of recovered.edits content.
+ */
+@Category({MediumTests.class})
+public class TestRecoveredEdits {
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+ @Rule public TestName testName = new TestName();
+
+ /**
+ * HBASE-12782 ITBLL fails for me if generator does anything but 5M per
maptask.
+ * Create a region. Close it. Then copy into place a file to replay, one
that is bigger than
+ * configured flush size so we bring on lots of flushes. Then reopen and
confirm all edits
+ * made it in.
+ * @throws IOException
+ */
+ @Test (timeout=30000)
+ public void testReplayWorksThoughLotsOfFlushing() throws IOException {
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ // Set it so we flush every 1M or so. Thats a lot.
+ conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024*1024);
+ // The file of recovered edits has a column family of 'meta'. Also has an
encoded regionname
+ // of 4823016d8fca70b25503ee07f4c6d79f which needs to match on replay.
+ final String encodedRegionName = "4823016d8fca70b25503ee07f4c6d79f";
+ HTableDescriptor htd = new
HTableDescriptor(TableName.valueOf(testName.getMethodName()));
+ final String columnFamily = "meta";
+ byte [][] columnFamilyAsByteArray = new byte [][]
{Bytes.toBytes(columnFamily)};
+ htd.addFamily(new HColumnDescriptor(columnFamily));
+ HRegionInfo hri = new HRegionInfo(htd.getTableName()) {
+ @Override
+ public synchronized String getEncodedName() {
+ return encodedRegionName;
+ }
+
+ // Cache the name because lots of lookups.
+ private byte [] encodedRegionNameAsBytes = null;
+ @Override
+ public synchronized byte[] getEncodedNameAsBytes() {
+ if (encodedRegionNameAsBytes == null) {
+ this.encodedRegionNameAsBytes = Bytes.toBytes(getEncodedName());
+ }
+ return this.encodedRegionNameAsBytes;
+ }
+ };
+ Path hbaseRootDir = TEST_UTIL.getDataTestDir();
+ HRegion region = HRegion.createHRegion(hri, hbaseRootDir, conf, htd, null);
+ assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName());
+ List<String> storeFiles = region.getStoreFileList(columnFamilyAsByteArray);
+ // There should be no store files.
+ assertTrue(storeFiles.isEmpty());
+ region.close();
+ Path regionDir = region.getRegionDir(hbaseRootDir, hri);
+ Path recoveredEditsDir =
WALSplitter.getRegionDirRecoveredEditsDir(regionDir);
+ // This is a little fragile getting this path to a file of 10M of edits.
+ Path recoveredEditsFile = new Path(new Path(
+ System.getProperty("project.build.testSourceDirectory", "src" +
Path.SEPARATOR + "test"),
+ "data"), "0000000000000016310");
+ // Copy this file under the region's recovered.edits dir so it is replayed
on reopen.
+ FileSystem fs = FileSystem.get(conf);
+ Path destination = new Path(recoveredEditsDir,
recoveredEditsFile.getName());
+ fs.copyToLocalFile(recoveredEditsFile, destination);
+ assertTrue(fs.exists(destination));
+ // Now the file 0000000000000016310 is under recovered.edits, reopen the
region to replay.
+ region = HRegion.openHRegion(region, null);
+ assertEquals(encodedRegionName, region.getRegionInfo().getEncodedName());
+ storeFiles = region.getStoreFileList(columnFamilyAsByteArray);
+ // Our 0000000000000016310 is 10MB. Most of the edits are for one region.
Lets assume that if
+ // we flush at 1MB, that there are at least 3 flushed files that are there
because of the
+ // replay of edits.
+ assertTrue("Files count=" + storeFiles.size(), storeFiles.size() > 10);
+ // Now verify all edits made it into the region.
+ int count = verifyAllEditsMadeItIn(fs, conf, recoveredEditsFile, region);
+ Log.info("Checked " + count + " edits made it in");
+ }
+
+ /**
+ * @param fs
+ * @param conf
+ * @param edits
+ * @param region
+ * @return Return how many edits seen.
+ * @throws IOException
+ */
+ private int verifyAllEditsMadeItIn(final FileSystem fs, final Configuration
conf,
+ final Path edits, final HRegion region)
+ throws IOException {
+ int count = 0;
+ // Based on HRegion#replayRecoveredEdits
+ WAL.Reader reader = null;
+ try {
+ reader = WALFactory.createReader(fs, edits, conf);
+ WAL.Entry entry;
+ while ((entry = reader.next()) != null) {
+ WALKey key = entry.getKey();
+ WALEdit val = entry.getEdit();
+ count++;
+ // Check this edit is for this region.
+ if (!Bytes.equals(key.getEncodedRegionName(),
+ region.getRegionInfo().getEncodedNameAsBytes())) {
+ continue;
+ }
+ Cell previous = null;
+ for (Cell cell: val.getCells()) {
+ if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
+ if (previous != null && CellComparator.compareRows(previous, cell)
== 0) continue;
+ previous = cell;
+ Get g = new Get(CellUtil.cloneRow(cell));
+ Result r = region.get(g);
+ boolean found = false;
+ for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
+ Cell current = scanner.current();
+ if (CellComparator.compare(cell, current, true) == 0) {
+ found = true;
+ break;
+ }
+ }
+ assertTrue("Failed to find " + cell, found);
+ }
+ }
+ } finally {
+ if (reader != null) reader.close();
+ }
+ return count;
+ }
+}
\ No newline at end of file