http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java new file mode 100644 index 0000000..4331c0f --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.util.ConfigurationUtil; +import org.apache.hadoop.hbase.util.FSUtils; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * Shared implementation of mapreduce code over multiple table snapshots. + * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce + * .MultiTableSnapshotInputFormat} and mapred + * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations. + */ [email protected]({ "HBase" }) [email protected] +public class MultiTableSnapshotInputFormatImpl { + + private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class); + + public static final String RESTORE_DIRS_KEY = + "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping"; + public static final String SNAPSHOT_TO_SCANS_KEY = + "hbase.MultiTableSnapshotInputFormat.snapshotsToScans"; + + /** + * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of + * restoreDir. + * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY} + * + * @param conf + * @param snapshotScans + * @param restoreDir + * @throws IOException + */ + public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans, + Path restoreDir) throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + setSnapshotToScans(conf, snapshotScans); + Map<String, Path> restoreDirs = + generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir); + setSnapshotDirs(conf, restoreDirs); + restoreSnapshots(conf, restoreDirs, fs); + } + + /** + * Return the list of splits extracted from the scans/snapshots pushed to conf by + * {@link + * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)} + * + * @param conf Configuration to determine splits from + * @return Return the list of splits extracted from the scans/snapshots pushed to conf + * @throws IOException + */ + public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf) + throws IOException { + Path rootDir = FSUtils.getRootDir(conf); + FileSystem fs = rootDir.getFileSystem(conf); + + List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList(); + + Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf); + Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf); + for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) { + String snapshotName = entry.getKey(); + + Path restoreDir = snapshotsToRestoreDirs.get(snapshotName); + + SnapshotManifest manifest = + TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs); + List<HRegionInfo> regionInfos = + TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest); + + for (Scan scan : entry.getValue()) { + List<TableSnapshotInputFormatImpl.InputSplit> splits = + TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf); + rtn.addAll(splits); + } + } + return rtn; + } + + /** + * Retrieve the snapshot name -> list<scan> mapping pushed to configuration by + * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)} + * + * @param conf Configuration to extract name -> list<scan> mappings from. + * @return the snapshot name -> list<scan> mapping pushed to configuration + * @throws IOException + */ + public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException { + + Map<String, Collection<Scan>> rtn = Maps.newHashMap(); + + for (Map.Entry<String, String> entry : ConfigurationUtil + .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) { + String snapshotName = entry.getKey(); + String scan = entry.getValue(); + + Collection<Scan> snapshotScans = rtn.get(snapshotName); + if (snapshotScans == null) { + snapshotScans = Lists.newArrayList(); + rtn.put(snapshotName, snapshotScans); + } + + snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan)); + } + + return rtn; + } + + /** + * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY}) + * + * @param conf + * @param snapshotScans + * @throws IOException + */ + public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans) + throws IOException { + // flatten out snapshotScans for serialization to the job conf + List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList(); + + for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) { + String snapshotName = entry.getKey(); + Collection<Scan> scans = entry.getValue(); + + // serialize all scans and map them to the appropriate snapshot + for (Scan scan : scans) { + snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName, + TableMapReduceUtil.convertScanToString(scan))); + } + } + + ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans); + } + + /** + * Retrieve the directories into which snapshots have been restored from + * ({@link #RESTORE_DIRS_KEY}) + * + * @param conf Configuration to extract restore directories from + * @return the directories into which snapshots have been restored from + * @throws IOException + */ + public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException { + List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY); + Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size()); + + for (Map.Entry<String, String> kvp : kvps) { + rtn.put(kvp.getKey(), new Path(kvp.getValue())); + } + + return rtn; + } + + public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) { + Map<String, String> toSet = Maps.newHashMap(); + + for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) { + toSet.put(entry.getKey(), entry.getValue().toString()); + } + + ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet()); + } + + /** + * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and + * return a map from the snapshot to the restore directory. + * + * @param snapshots collection of snapshot names to restore + * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored + * @return a mapping from snapshot name to the directory in which that snapshot has been restored + */ + private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots, + Path baseRestoreDir) { + Map<String, Path> rtn = Maps.newHashMap(); + + for (String snapshotName : snapshots) { + Path restoreSnapshotDir = + new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString()); + rtn.put(snapshotName, restoreSnapshotDir); + } + + return rtn; + } + + /** + * Restore each (snapshot name, restore directory) pair in snapshotToDir + * + * @param conf configuration to restore with + * @param snapshotToDir mapping from snapshot names to restore directories + * @param fs filesystem to do snapshot restoration on + * @throws IOException + */ + public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs) + throws IOException { + // TODO: restore from record readers to parallelize. + Path rootDir = FSUtils.getRootDir(conf); + + for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) { + String snapshotName = entry.getKey(); + Path restoreDir = entry.getValue(); + LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir + + " for MultiTableSnapshotInputFormat"); + restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs); + } + } + + void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir, + FileSystem fs) throws IOException { + RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java new file mode 100644 index 0000000..a505379 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java @@ -0,0 +1,301 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.util.ReflectionUtils; + + +/** + * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper + * <p> + * It can be used instead when the Map operation is not CPU + * bound in order to improve throughput. + * <p> + * Mapper implementations using this MapRunnable must be thread-safe. + * <p> + * The Map-Reduce job has to be configured with the mapper to use via + * {@link #setMapperClass} and the number of thread the thread-pool can use with the + * {@link #getNumberOfThreads} method. The default value is 10 threads. + * <p> + */ + +public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> { + private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class); + private Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> mapClass; + private Context outer; + private ExecutorService executor; + public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads"; + public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass"; + + /** + * The number of threads in the thread pool that will run the map function. + * @param job the job + * @return the number of threads + */ + public static int getNumberOfThreads(JobContext job) { + return job.getConfiguration(). + getInt(NUMBER_OF_THREADS, 10); + } + + /** + * Set the number of threads in the pool for running maps. + * @param job the job to modify + * @param threads the new number of threads + */ + public static void setNumberOfThreads(Job job, int threads) { + job.getConfiguration().setInt(NUMBER_OF_THREADS, + threads); + } + + /** + * Get the application's mapper class. + * @param <K2> the map's output key type + * @param <V2> the map's output value type + * @param job the job + * @return the mapper class to run + */ + @SuppressWarnings("unchecked") + public static <K2,V2> + Class<Mapper<ImmutableBytesWritable, Result,K2,V2>> getMapperClass(JobContext job) { + return (Class<Mapper<ImmutableBytesWritable, Result,K2,V2>>) + job.getConfiguration().getClass( MAPPER_CLASS, + Mapper.class); + } + + /** + * Set the application's mapper class. + * @param <K2> the map output key type + * @param <V2> the map output value type + * @param job the job to modify + * @param cls the class to use as the mapper + */ + public static <K2,V2> + void setMapperClass(Job job, + Class<? extends Mapper<ImmutableBytesWritable, Result,K2,V2>> cls) { + if (MultithreadedTableMapper.class.isAssignableFrom(cls)) { + throw new IllegalArgumentException("Can't have recursive " + + "MultithreadedTableMapper instances."); + } + job.getConfiguration().setClass(MAPPER_CLASS, + cls, Mapper.class); + } + + /** + * Run the application's maps using a thread pool. + */ + @Override + public void run(Context context) throws IOException, InterruptedException { + outer = context; + int numberOfThreads = getNumberOfThreads(context); + mapClass = getMapperClass(context); + if (LOG.isDebugEnabled()) { + LOG.debug("Configuring multithread runner to use " + numberOfThreads + + " threads"); + } + executor = Executors.newFixedThreadPool(numberOfThreads); + for(int i=0; i < numberOfThreads; ++i) { + MapRunner thread = new MapRunner(context); + executor.execute(thread); + } + executor.shutdown(); + while (!executor.isTerminated()) { + // wait till all the threads are done + Thread.sleep(1000); + } + } + + private class SubMapRecordReader + extends RecordReader<ImmutableBytesWritable, Result> { + private ImmutableBytesWritable key; + private Result value; + private Configuration conf; + + @Override + public void close() throws IOException { + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void initialize(InputSplit split, + TaskAttemptContext context + ) throws IOException, InterruptedException { + conf = context.getConfiguration(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + synchronized (outer) { + if (!outer.nextKeyValue()) { + return false; + } + key = ReflectionUtils.copy(outer.getConfiguration(), + outer.getCurrentKey(), key); + value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value); + return true; + } + } + + public ImmutableBytesWritable getCurrentKey() { + return key; + } + + @Override + public Result getCurrentValue() { + return value; + } + } + + private class SubMapRecordWriter extends RecordWriter<K2,V2> { + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + } + + @Override + public void write(K2 key, V2 value) throws IOException, + InterruptedException { + synchronized (outer) { + outer.write(key, value); + } + } + } + + private class SubMapStatusReporter extends StatusReporter { + + @Override + public Counter getCounter(Enum<?> name) { + return outer.getCounter(name); + } + + @Override + public Counter getCounter(String group, String name) { + return outer.getCounter(group, name); + } + + @Override + public void progress() { + outer.progress(); + } + + @Override + public void setStatus(String status) { + outer.setStatus(status); + } + + public float getProgress() { + return 0; + } + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION", + justification="Don't understand why FB is complaining about this one. We do throw exception") + private class MapRunner implements Runnable { + private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper; + private Context subcontext; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + MapRunner(Context context) throws IOException, InterruptedException { + mapper = ReflectionUtils.newInstance(mapClass, + context.getConfiguration()); + try { + Constructor c = context.getClass().getConstructor( + Mapper.class, + Configuration.class, + TaskAttemptID.class, + RecordReader.class, + RecordWriter.class, + OutputCommitter.class, + StatusReporter.class, + InputSplit.class); + c.setAccessible(true); + subcontext = (Context) c.newInstance( + mapper, + outer.getConfiguration(), + outer.getTaskAttemptID(), + new SubMapRecordReader(), + new SubMapRecordWriter(), + context.getOutputCommitter(), + new SubMapStatusReporter(), + outer.getInputSplit()); + } catch (Exception e) { + try { + Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor( + Configuration.class, + TaskAttemptID.class, + RecordReader.class, + RecordWriter.class, + OutputCommitter.class, + StatusReporter.class, + InputSplit.class); + c.setAccessible(true); + MapContext mc = (MapContext) c.newInstance( + outer.getConfiguration(), + outer.getTaskAttemptID(), + new SubMapRecordReader(), + new SubMapRecordWriter(), + context.getOutputCommitter(), + new SubMapStatusReporter(), + outer.getInputSplit()); + Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper"); + Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class); + subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc); + } catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION + // rethrow as IOE + throw new IOException(e); + } + } + } + + @Override + public void run() { + try { + mapper.run(subcontext); + } catch (Throwable ie) { + LOG.error("Problem in running map.", ie); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java new file mode 100644 index 0000000..d5faab5 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MutationSerialization.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; + [email protected] +public class MutationSerialization implements Serialization<Mutation> { + @Override + public boolean accept(Class<?> c) { + return Mutation.class.isAssignableFrom(c); + } + + @Override + public Deserializer<Mutation> getDeserializer(Class<Mutation> c) { + return new MutationDeserializer(); + } + + @Override + public Serializer<Mutation> getSerializer(Class<Mutation> c) { + return new MutationSerializer(); + } + + private static class MutationDeserializer implements Deserializer<Mutation> { + private InputStream in; + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public Mutation deserialize(Mutation mutation) throws IOException { + MutationProto proto = MutationProto.parseDelimitedFrom(in); + return ProtobufUtil.toMutation(proto); + } + + @Override + public void open(InputStream in) throws IOException { + this.in = in; + } + + } + private static class MutationSerializer implements Serializer<Mutation> { + private OutputStream out; + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void open(OutputStream out) throws IOException { + this.out = out; + } + + @Override + public void serialize(Mutation mutation) throws IOException { + MutationType type; + if (mutation instanceof Put) { + type = MutationType.PUT; + } else if (mutation instanceof Delete) { + type = MutationType.DELETE; + } else { + throw new IllegalArgumentException("Only Put and Delete are supported"); + } + ProtobufUtil.toMutation(type, mutation).writeDelimitedTo(out); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java new file mode 100644 index 0000000..f01e84f --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutCombiner.java @@ -0,0 +1,98 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.List; +import java.util.Map.Entry; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.mapreduce.Reducer; + +/** + * Combine Puts. Merges Put instances grouped by <code>K</code> into a single + * instance. + * @see TableMapReduceUtil + */ [email protected] +public class PutCombiner<K> extends Reducer<K, Put, K, Put> { + private static final Log LOG = LogFactory.getLog(PutCombiner.class); + + @Override + protected void reduce(K row, Iterable<Put> vals, Context context) + throws IOException, InterruptedException { + // Using HeapSize to create an upper bound on the memory size of + // the puts and flush some portion of the content while looping. This + // flush could result in multiple Puts for a single rowkey. That is + // acceptable because Combiner is run as an optimization and it's not + // critical that all Puts are grouped perfectly. + long threshold = context.getConfiguration().getLong( + "putcombiner.row.threshold", 1L * (1<<30)); + int cnt = 0; + long curSize = 0; + Put put = null; + Map<byte[], List<Cell>> familyMap = null; + for (Put p : vals) { + cnt++; + if (put == null) { + put = p; + familyMap = put.getFamilyCellMap(); + } else { + for (Entry<byte[], List<Cell>> entry : p.getFamilyCellMap() + .entrySet()) { + List<Cell> cells = familyMap.get(entry.getKey()); + List<Cell> kvs = (cells != null) ? (List<Cell>) cells : null; + for (Cell cell : entry.getValue()) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + curSize += kv.heapSize(); + if (kvs != null) { + kvs.add(kv); + } + } + if (cells == null) { + familyMap.put(entry.getKey(), entry.getValue()); + } + } + if (cnt % 10 == 0) context.setStatus("Combine " + cnt); + if (curSize > threshold) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1)); + } + context.write(row, put); + put = null; + curSize = 0; + cnt = 0; + } + } + } + if (put != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Combined %d Put(s) into %d.", cnt, 1)); + } + context.write(row, put); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java new file mode 100644 index 0000000..17ab9cb --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PutSortReducer.java @@ -0,0 +1,147 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ArrayBackedTag; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.TagUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.security.visibility.CellVisibility; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.util.StringUtils; + +/** + * Emits sorted Puts. + * Reads in all Puts from passed Iterator, sorts them, then emits + * Puts in sorted order. If lots of columns per row, it will use lots of + * memory sorting. + * @see HFileOutputFormat2 + * @see KeyValueSortReducer + */ [email protected] +public class PutSortReducer extends + Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> { + // the cell creator + private CellCreator kvCreator; + + @Override + protected void + setup(Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue>.Context context) + throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + this.kvCreator = new CellCreator(conf); + } + + @Override + protected void reduce( + ImmutableBytesWritable row, + java.lang.Iterable<Put> puts, + Reducer<ImmutableBytesWritable, Put, + ImmutableBytesWritable, KeyValue>.Context context) + throws java.io.IOException, InterruptedException + { + // although reduce() is called per-row, handle pathological case + long threshold = context.getConfiguration().getLong( + "putsortreducer.row.threshold", 1L * (1<<30)); + Iterator<Put> iter = puts.iterator(); + while (iter.hasNext()) { + TreeSet<KeyValue> map = new TreeSet<>(CellComparator.COMPARATOR); + long curSize = 0; + // stop at the end or the RAM threshold + List<Tag> tags = new ArrayList<>(); + while (iter.hasNext() && curSize < threshold) { + // clear the tags + tags.clear(); + Put p = iter.next(); + long t = p.getTTL(); + if (t != Long.MAX_VALUE) { + // add TTL tag if found + tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(t))); + } + byte[] acl = p.getACL(); + if (acl != null) { + // add ACL tag if found + tags.add(new ArrayBackedTag(TagType.ACL_TAG_TYPE, acl)); + } + try { + CellVisibility cellVisibility = p.getCellVisibility(); + if (cellVisibility != null) { + // add the visibility labels if any + tags.addAll(kvCreator.getVisibilityExpressionResolver() + .createVisibilityExpTags(cellVisibility.getExpression())); + } + } catch (DeserializationException e) { + // We just throw exception here. Should we allow other mutations to proceed by + // just ignoring the bad one? + throw new IOException("Invalid visibility expression found in mutation " + p, e); + } + for (List<Cell> cells: p.getFamilyCellMap().values()) { + for (Cell cell: cells) { + // Creating the KV which needs to be directly written to HFiles. Using the Facade + // KVCreator for creation of kvs. + KeyValue kv = null; + TagUtil.carryForwardTags(tags, cell); + if (!tags.isEmpty()) { + kv = (KeyValue) kvCreator.create(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength(), cell.getTimestamp(), cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength(), tags); + } else { + kv = KeyValueUtil.ensureKeyValue(cell); + } + if (map.add(kv)) {// don't count duplicated kv into size + curSize += kv.heapSize(); + } + } + } + } + context.setStatus("Read " + map.size() + " entries of " + map.getClass() + + "(" + StringUtils.humanReadableInt(curSize) + ")"); + int index = 0; + for (KeyValue kv : map) { + context.write(row, kv); + if (++index % 100 == 0) + context.setStatus("Wrote " + index); + } + + // if we have more entries to process + if (iter.hasNext()) { + // force flush because we cannot guarantee intra-row sorted order + context.write(null, null); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RegionSizeCalculator.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RegionSizeCalculator.java new file mode 100644 index 0000000..f14cd90 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RegionSizeCalculator.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLoad; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Computes size of each region for given table and given column families. + * The value is used by MapReduce for better scheduling. + * */ [email protected] +public class RegionSizeCalculator { + + private static final Log LOG = LogFactory.getLog(RegionSizeCalculator.class); + + /** + * Maps each region to its size in bytes. + * */ + private final Map<byte[], Long> sizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + + static final String ENABLE_REGIONSIZECALCULATOR = "hbase.regionsizecalculator.enable"; + private static final long MEGABYTE = 1024L * 1024L; + + /** + * Computes size of each region for table and given column families. + * */ + public RegionSizeCalculator(RegionLocator regionLocator, Admin admin) throws IOException { + init(regionLocator, admin); + } + + private void init(RegionLocator regionLocator, Admin admin) + throws IOException { + if (!enabled(admin.getConfiguration())) { + LOG.info("Region size calculation disabled."); + return; + } + + if (regionLocator.getName().isSystemTable()) { + LOG.info("Region size calculation disabled for system tables."); + return; + } + + LOG.info("Calculating region sizes for table \"" + regionLocator.getName() + "\"."); + + // Get the servers which host regions of the table + Set<ServerName> tableServers = getRegionServersOfTable(regionLocator); + + for (ServerName tableServerName : tableServers) { + Map<byte[], RegionLoad> regionLoads = + admin.getRegionLoad(tableServerName, regionLocator.getName()); + for (RegionLoad regionLoad : regionLoads.values()) { + + byte[] regionId = regionLoad.getName(); + long regionSizeBytes = regionLoad.getStorefileSizeMB() * MEGABYTE; + sizeMap.put(regionId, regionSizeBytes); + + if (LOG.isDebugEnabled()) { + LOG.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes); + } + } + } + LOG.debug("Region sizes calculated"); + } + + private Set<ServerName> getRegionServersOfTable(RegionLocator regionLocator) + throws IOException { + + Set<ServerName> tableServers = Sets.newHashSet(); + for (HRegionLocation regionLocation : regionLocator.getAllRegionLocations()) { + tableServers.add(regionLocation.getServerName()); + } + return tableServers; + } + + boolean enabled(Configuration configuration) { + return configuration.getBoolean(ENABLE_REGIONSIZECALCULATOR, true); + } + + /** + * Returns size of given region in bytes. Returns 0 if region was not found. + * */ + public long getRegionSize(byte[] regionId) { + Long size = sizeMap.get(regionId); + if (size == null) { + LOG.debug("Unknown region:" + Arrays.toString(regionId)); + return 0; + } else { + return size; + } + } + + public Map<byte[], Long> getRegionSizeMap() { + return Collections.unmodifiableMap(sizeMap); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java new file mode 100644 index 0000000..dff04b6 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/ResultSerialization.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; + [email protected] +public class ResultSerialization extends Configured implements Serialization<Result> { + private static final Log LOG = LogFactory.getLog(ResultSerialization.class); + // The following configuration property indicates import file format version. + public static final String IMPORT_FORMAT_VER = "hbase.import.version"; + + @Override + public boolean accept(Class<?> c) { + return Result.class.isAssignableFrom(c); + } + + @Override + public Deserializer<Result> getDeserializer(Class<Result> c) { + // check input format version + Configuration conf = getConf(); + if (conf != null) { + String inputVersion = conf.get(IMPORT_FORMAT_VER); + if (inputVersion != null && inputVersion.equals("0.94")) { + LOG.info("Load exported file using deserializer for HBase 0.94 format"); + return new Result94Deserializer(); + } + } + + return new ResultDeserializer(); + } + + @Override + public Serializer<Result> getSerializer(Class<Result> c) { + return new ResultSerializer(); + } + + /** + * The following deserializer class is used to load exported file of 0.94 + */ + private static class Result94Deserializer implements Deserializer<Result> { + private DataInputStream in; + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public Result deserialize(Result mutation) throws IOException { + int totalBuffer = in.readInt(); + if (totalBuffer == 0) { + return Result.EMPTY_RESULT; + } + byte[] buf = new byte[totalBuffer]; + readChunked(in, buf, 0, totalBuffer); + List<Cell> kvs = new ArrayList<>(); + int offset = 0; + while (offset < totalBuffer) { + int keyLength = Bytes.toInt(buf, offset); + offset += Bytes.SIZEOF_INT; + kvs.add(new KeyValue(buf, offset, keyLength)); + offset += keyLength; + } + return Result.create(kvs); + } + + @Override + public void open(InputStream in) throws IOException { + if (!(in instanceof DataInputStream)) { + throw new IOException("Wrong input stream instance passed in"); + } + this.in = (DataInputStream) in; + } + + private void readChunked(final DataInput in, byte[] dest, int ofs, int len) throws IOException { + int maxRead = 8192; + + for (; ofs < len; ofs += maxRead) + in.readFully(dest, ofs, Math.min(len - ofs, maxRead)); + } + } + + private static class ResultDeserializer implements Deserializer<Result> { + private InputStream in; + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public Result deserialize(Result mutation) throws IOException { + ClientProtos.Result proto = ClientProtos.Result.parseDelimitedFrom(in); + return ProtobufUtil.toResult(proto); + } + + @Override + public void open(InputStream in) throws IOException { + this.in = in; + } + } + + private static class ResultSerializer implements Serializer<Result> { + private OutputStream out; + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public void open(OutputStream out) throws IOException { + this.out = out; + } + + @Override + public void serialize(Result result) throws IOException { + ProtobufUtil.toResult(result).writeDelimitedTo(out); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java new file mode 100644 index 0000000..2e0591e --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/RowCounter.java @@ -0,0 +1,265 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.MultiRowRangeFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A job with a just a map phase to count rows. Map outputs table rows IF the + * input row has columns that have content. + */ [email protected] +public class RowCounter extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(RowCounter.class); + + /** Name of this 'program'. */ + static final String NAME = "rowcounter"; + + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + private final static String EXPECTED_COUNT_KEY = RowCounter.class.getName() + ".expected_count"; + + /** + * Mapper that runs the count. + */ + static class RowCounterMapper + extends TableMapper<ImmutableBytesWritable, Result> { + + /** Counter enumeration to count the actual rows. */ + public static enum Counters {ROWS} + + /** + * Maps the data. + * + * @param row The current table row key. + * @param values The columns. + * @param context The current context. + * @throws IOException When something is broken with the data. + * @see org.apache.hadoop.mapreduce.Mapper#map(Object, Object, Context) + */ + @Override + public void map(ImmutableBytesWritable row, Result values, + Context context) + throws IOException { + // Count every row containing data, whether it's in qualifiers or values + context.getCounter(Counters.ROWS).increment(1); + } + } + + /** + * Sets up the actual job. + * + * @param conf The current configuration. + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public static Job createSubmittableJob(Configuration conf, String[] args) + throws IOException { + String tableName = args[0]; + List<MultiRowRangeFilter.RowRange> rowRangeList = null; + long startTime = 0; + long endTime = 0; + + StringBuilder sb = new StringBuilder(); + + final String rangeSwitch = "--range="; + final String startTimeArgKey = "--starttime="; + final String endTimeArgKey = "--endtime="; + final String expectedCountArg = "--expected-count="; + + // First argument is table name, starting from second + for (int i = 1; i < args.length; i++) { + if (args[i].startsWith(rangeSwitch)) { + try { + rowRangeList = parseRowRangeParameter(args[i], rangeSwitch); + } catch (IllegalArgumentException e) { + return null; + } + continue; + } + if (args[i].startsWith(startTimeArgKey)) { + startTime = Long.parseLong(args[i].substring(startTimeArgKey.length())); + continue; + } + if (args[i].startsWith(endTimeArgKey)) { + endTime = Long.parseLong(args[i].substring(endTimeArgKey.length())); + continue; + } + if (args[i].startsWith(expectedCountArg)) { + conf.setLong(EXPECTED_COUNT_KEY, + Long.parseLong(args[i].substring(expectedCountArg.length()))); + continue; + } + // if no switch, assume column names + sb.append(args[i]); + sb.append(" "); + } + if (endTime < startTime) { + printUsage("--endtime=" + endTime + " needs to be greater than --starttime=" + startTime); + return null; + } + + Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); + job.setJarByClass(RowCounter.class); + Scan scan = new Scan(); + scan.setCacheBlocks(false); + setScanFilter(scan, rowRangeList); + if (sb.length() > 0) { + for (String columnName : sb.toString().trim().split(" ")) { + String family = StringUtils.substringBefore(columnName, ":"); + String qualifier = StringUtils.substringAfter(columnName, ":"); + + if (StringUtils.isBlank(qualifier)) { + scan.addFamily(Bytes.toBytes(family)); + } + else { + scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); + } + } + } + scan.setTimeRange(startTime, endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime); + job.setOutputFormatClass(NullOutputFormat.class); + TableMapReduceUtil.initTableMapperJob(tableName, scan, + RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job); + job.setNumReduceTasks(0); + return job; + } + + private static List<MultiRowRangeFilter.RowRange> parseRowRangeParameter( + String arg, String rangeSwitch) { + final String[] ranges = arg.substring(rangeSwitch.length()).split(";"); + final List<MultiRowRangeFilter.RowRange> rangeList = new ArrayList<>(); + for (String range : ranges) { + String[] startEnd = range.split(",", 2); + if (startEnd.length != 2 || startEnd[1].contains(",")) { + printUsage("Please specify range in such format as \"--range=a,b\" " + + "or, with only one boundary, \"--range=,b\" or \"--range=a,\""); + throw new IllegalArgumentException("Wrong range specification: " + range); + } + String startKey = startEnd[0]; + String endKey = startEnd[1]; + rangeList.add(new MultiRowRangeFilter.RowRange( + Bytes.toBytesBinary(startKey), true, + Bytes.toBytesBinary(endKey), false)); + } + return rangeList; + } + + /** + * Sets filter {@link FilterBase} to the {@link Scan} instance. + * If provided rowRangeList contains more than one element, + * method sets filter which is instance of {@link MultiRowRangeFilter}. + * Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}. + * If rowRangeList contains exactly one element, startRow and stopRow are set to the scan. + * @param scan + * @param rowRangeList + */ + private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) { + final int size = rowRangeList == null ? 0 : rowRangeList.size(); + if (size <= 1) { + scan.setFilter(new FirstKeyOnlyFilter()); + } + if (size == 1) { + MultiRowRangeFilter.RowRange range = rowRangeList.get(0); + scan.setStartRow(range.getStartRow()); //inclusive + scan.setStopRow(range.getStopRow()); //exclusive + } else if (size > 1) { + scan.setFilter(new MultiRowRangeFilter(rowRangeList)); + } + } + + /* + * @param errorMessage Can attach a message when error occurs. + */ + private static void printUsage(String errorMessage) { + System.err.println("ERROR: " + errorMessage); + printUsage(); + } + + /** + * Prints usage without error message. + * Note that we don't document --expected-count, because it's intended for test. + */ + private static void printUsage() { + System.err.println("Usage: RowCounter [options] <tablename> " + + "[--starttime=[start] --endtime=[end] " + + "[--range=[startKey],[endKey][;[startKey],[endKey]...]] [<column1> <column2>...]"); + System.err.println("For performance consider the following options:\n" + + "-Dhbase.client.scanner.caching=100\n" + + "-Dmapreduce.map.speculative=false"); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 1) { + printUsage("Wrong number of parameters: " + args.length); + return -1; + } + Job job = createSubmittableJob(getConf(), args); + if (job == null) { + return -1; + } + boolean success = job.waitForCompletion(true); + final long expectedCount = getConf().getLong(EXPECTED_COUNT_KEY, -1); + if (success && expectedCount != -1) { + final Counter counter = job.getCounters().findCounter(RowCounterMapper.Counters.ROWS); + success = expectedCount == counter.getValue(); + if (!success) { + LOG.error("Failing job because count of '" + counter.getValue() + + "' does not match expected count of '" + expectedCount + "'"); + } + } + return (success ? 0 : 1); + } + + /** + * Main entry point. + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int errCode = ToolRunner.run(HBaseConfiguration.create(), new RowCounter(), args); + System.exit(errCode); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java ---------------------------------------------------------------------- diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java new file mode 100644 index 0000000..01a919c --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SimpleTotalOrderPartitioner.java @@ -0,0 +1,143 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Base64; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Partitioner; + +/** + * A partitioner that takes start and end keys and uses bigdecimal to figure + * which reduce a key belongs to. Pass the start and end + * keys in the Configuration using <code>hbase.simpletotalorder.start</code> + * and <code>hbase.simpletotalorder.end</code>. The end key needs to be + * exclusive; i.e. one larger than the biggest key in your key space. + * You may be surprised at how this class partitions the space; it may not + * align with preconceptions; e.g. a start key of zero and an end key of 100 + * divided in ten will not make regions whose range is 0-10, 10-20, and so on. + * Make your own partitioner if you need the region spacing to come out a + * particular way. + * @param <VALUE> + * @see #START + * @see #END + */ [email protected] +public class SimpleTotalOrderPartitioner<VALUE> extends Partitioner<ImmutableBytesWritable, VALUE> +implements Configurable { + private final static Log LOG = LogFactory.getLog(SimpleTotalOrderPartitioner.class); + + @Deprecated + public static final String START = "hbase.simpletotalorder.start"; + @Deprecated + public static final String END = "hbase.simpletotalorder.end"; + + static final String START_BASE64 = "hbase.simpletotalorder.start.base64"; + static final String END_BASE64 = "hbase.simpletotalorder.end.base64"; + + private Configuration c; + private byte [] startkey; + private byte [] endkey; + private byte [][] splits; + private int lastReduces = -1; + + public static void setStartKey(Configuration conf, byte[] startKey) { + conf.set(START_BASE64, Base64.encodeBytes(startKey)); + } + + public static void setEndKey(Configuration conf, byte[] endKey) { + conf.set(END_BASE64, Base64.encodeBytes(endKey)); + } + + @SuppressWarnings("deprecation") + static byte[] getStartKey(Configuration conf) { + return getKeyFromConf(conf, START_BASE64, START); + } + + @SuppressWarnings("deprecation") + static byte[] getEndKey(Configuration conf) { + return getKeyFromConf(conf, END_BASE64, END); + } + + private static byte[] getKeyFromConf(Configuration conf, + String base64Key, String deprecatedKey) { + String encoded = conf.get(base64Key); + if (encoded != null) { + return Base64.decode(encoded); + } + String oldStyleVal = conf.get(deprecatedKey); + if (oldStyleVal == null) { + return null; + } + LOG.warn("Using deprecated configuration " + deprecatedKey + + " - please use static accessor methods instead."); + return Bytes.toBytesBinary(oldStyleVal); + } + + @Override + public int getPartition(final ImmutableBytesWritable key, final VALUE value, + final int reduces) { + if (reduces == 1) return 0; + if (this.lastReduces != reduces) { + this.splits = Bytes.split(this.startkey, this.endkey, reduces - 1); + for (int i = 0; i < splits.length; i++) { + LOG.info(Bytes.toStringBinary(splits[i])); + } + this.lastReduces = reduces; + } + int pos = Bytes.binarySearch(this.splits, key.get(), key.getOffset(), + key.getLength()); + // Below code is from hfile index search. + if (pos < 0) { + pos++; + pos *= -1; + if (pos == 0) { + // falls before the beginning of the file. + throw new RuntimeException("Key outside start/stop range: " + + key.toString()); + } + pos--; + } + return pos; + } + + @Override + public Configuration getConf() { + return this.c; + } + + @Override + public void setConf(Configuration conf) { + this.c = conf; + this.startkey = getStartKey(conf); + this.endkey = getEndKey(conf); + if (startkey == null || endkey == null) { + throw new RuntimeException(this.getClass() + " not configured"); + } + LOG.info("startkey=" + Bytes.toStringBinary(startkey) + + ", endkey=" + Bytes.toStringBinary(endkey)); + // Reset last reduces count on change of Start / End key + this.lastReduces = -1; + } +}
