http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java deleted file mode 100644 index bf11473..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ /dev/null @@ -1,412 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; -import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.ClientSideRegionScanner; -import org.apache.hadoop.hbase.client.IsolationLevel; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit; -import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; -import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; -import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; -import org.apache.hadoop.hbase.snapshot.SnapshotManifest; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.io.Writable; - -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -/** - * Hadoop MR API-agnostic implementation for mapreduce over table snapshots. - */ [email protected] [email protected] -public class TableSnapshotInputFormatImpl { - // TODO: Snapshots files are owned in fs by the hbase user. There is no - // easy way to delegate access. - - public static final Log LOG = LogFactory.getLog(TableSnapshotInputFormatImpl.class); - - private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; - // key for specifying the root dir of the restored snapshot - protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; - - /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */ - private static final String LOCALITY_CUTOFF_MULTIPLIER = - "hbase.tablesnapshotinputformat.locality.cutoff.multiplier"; - private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f; - - /** - * Implementation class for InputSplit logic common between mapred and mapreduce. - */ - public static class InputSplit implements Writable { - - private TableDescriptor htd; - private HRegionInfo regionInfo; - private String[] locations; - private String scan; - private String restoreDir; - - // constructor for mapreduce framework / Writable - public InputSplit() {} - - public InputSplit(TableDescriptor htd, HRegionInfo regionInfo, List<String> locations, - Scan scan, Path restoreDir) { - this.htd = htd; - this.regionInfo = regionInfo; - if (locations == null || locations.isEmpty()) { - this.locations = new String[0]; - } else { - this.locations = locations.toArray(new String[locations.size()]); - } - try { - this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : ""; - } catch (IOException e) { - LOG.warn("Failed to convert Scan to String", e); - } - - this.restoreDir = restoreDir.toString(); - } - - public TableDescriptor getHtd() { - return htd; - } - - public String getScan() { - return scan; - } - - public String getRestoreDir() { - return restoreDir; - } - - public long getLength() { - //TODO: We can obtain the file sizes of the snapshot here. - return 0; - } - - public String[] getLocations() { - return locations; - } - - public TableDescriptor getTableDescriptor() { - return htd; - } - - public HRegionInfo getRegionInfo() { - return regionInfo; - } - - // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of - // doing this wrapping with Writables. - @Override - public void write(DataOutput out) throws IOException { - TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder() - .setTable(ProtobufUtil.toTableSchema(htd)) - .setRegion(HRegionInfo.convert(regionInfo)); - - for (String location : locations) { - builder.addLocations(location); - } - - TableSnapshotRegionSplit split = builder.build(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - split.writeTo(baos); - baos.close(); - byte[] buf = baos.toByteArray(); - out.writeInt(buf.length); - out.write(buf); - - Bytes.writeByteArray(out, Bytes.toBytes(scan)); - Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); - - } - - @Override - public void readFields(DataInput in) throws IOException { - int len = in.readInt(); - byte[] buf = new byte[len]; - in.readFully(buf); - TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf); - this.htd = ProtobufUtil.toTableDescriptor(split.getTable()); - this.regionInfo = HRegionInfo.convert(split.getRegion()); - List<String> locationsList = split.getLocationsList(); - this.locations = locationsList.toArray(new String[locationsList.size()]); - - this.scan = Bytes.toString(Bytes.readByteArray(in)); - this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); - } - } - - /** - * Implementation class for RecordReader logic common between mapred and mapreduce. - */ - public static class RecordReader { - private InputSplit split; - private Scan scan; - private Result result = null; - private ImmutableBytesWritable row = null; - private ClientSideRegionScanner scanner; - - public ClientSideRegionScanner getScanner() { - return scanner; - } - - public void initialize(InputSplit split, Configuration conf) throws IOException { - this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); - this.split = split; - TableDescriptor htd = split.htd; - HRegionInfo hri = this.split.getRegionInfo(); - FileSystem fs = FSUtils.getCurrentFileSystem(conf); - - - // region is immutable, this should be fine, - // otherwise we have to set the thread read point - scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); - // disable caching of data blocks - scan.setCacheBlocks(false); - - scanner = - new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null); - } - - public boolean nextKeyValue() throws IOException { - result = scanner.next(); - if (result == null) { - //we are done - return false; - } - - if (this.row == null) { - this.row = new ImmutableBytesWritable(); - } - this.row.set(result.getRow()); - return true; - } - - public ImmutableBytesWritable getCurrentKey() { - return row; - } - - public Result getCurrentValue() { - return result; - } - - public long getPos() { - return 0; - } - - public float getProgress() { - return 0; // TODO: use total bytes to estimate - } - - public void close() { - if (this.scanner != null) { - this.scanner.close(); - } - } - } - - public static List<InputSplit> getSplits(Configuration conf) throws IOException { - String snapshotName = getSnapshotName(conf); - - Path rootDir = FSUtils.getRootDir(conf); - FileSystem fs = rootDir.getFileSystem(conf); - - SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs); - - List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest); - - // TODO: mapred does not support scan as input API. Work around for now. - Scan scan = extractScanFromConf(conf); - // the temp dir where the snapshot is restored - Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); - - return getSplits(scan, manifest, regionInfos, restoreDir, conf); - } - - public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) { - List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests(); - if (regionManifests == null) { - throw new IllegalArgumentException("Snapshot seems empty"); - } - - List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size()); - - for (SnapshotRegionManifest regionManifest : regionManifests) { - HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo()); - if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { - continue; - } - regionInfos.add(hri); - } - return regionInfos; - } - - public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName, - Path rootDir, FileSystem fs) throws IOException { - Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); - SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); - return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); - } - - public static Scan extractScanFromConf(Configuration conf) throws IOException { - Scan scan = null; - if (conf.get(TableInputFormat.SCAN) != null) { - scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); - } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) { - String[] columns = - conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" "); - scan = new Scan(); - for (String col : columns) { - scan.addFamily(Bytes.toBytes(col)); - } - } else { - throw new IllegalArgumentException("Unable to create scan"); - } - return scan; - } - - public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest, - List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException { - // load table descriptor - TableDescriptor htd = manifest.getTableDescriptor(); - - Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName()); - - List<InputSplit> splits = new ArrayList<>(); - for (HRegionInfo hri : regionManifests) { - // load region descriptor - - if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), - hri.getEndKey())) { - // compute HDFS locations from snapshot files (which will get the locations for - // referred hfiles) - List<String> hosts = getBestLocations(conf, - HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); - - int len = Math.min(3, hosts.size()); - hosts = hosts.subList(0, len); - splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); - } - } - - return splits; - - } - - /** - * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take - * weights into account, thus will treat every location passed from the input split as equal. We - * do not want to blindly pass all the locations, since we are creating one split per region, and - * the region's blocks are all distributed throughout the cluster unless favorite node assignment - * is used. On the expected stable case, only one location will contain most of the blocks as - * local. - * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here - * we are doing a simple heuristic, where we will pass all hosts which have at least 80% - * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top - * host with the best locality. - */ - public static List<String> getBestLocations( - Configuration conf, HDFSBlocksDistribution blockDistribution) { - List<String> locations = new ArrayList<>(3); - - HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); - - if (hostAndWeights.length == 0) { - return locations; - } - - HostAndWeight topHost = hostAndWeights[0]; - locations.add(topHost.getHost()); - - // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality - double cutoffMultiplier - = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); - - double filterWeight = topHost.getWeight() * cutoffMultiplier; - - for (int i = 1; i < hostAndWeights.length; i++) { - if (hostAndWeights[i].getWeight() >= filterWeight) { - locations.add(hostAndWeights[i].getHost()); - } else { - break; - } - } - - return locations; - } - - private static String getSnapshotName(Configuration conf) { - String snapshotName = conf.get(SNAPSHOT_NAME_KEY); - if (snapshotName == null) { - throw new IllegalArgumentException("Snapshot name must be provided"); - } - return snapshotName; - } - - /** - * Configures the job to use TableSnapshotInputFormat to read from a snapshot. - * @param conf the job to configuration - * @param snapshotName the name of the snapshot to read from - * @param restoreDir a temporary directory to restore the snapshot into. Current user should - * have write permissions to this directory, and this should not be a subdirectory of rootdir. - * After the job is finished, restoreDir can be deleted. - * @throws IOException if an error occurs - */ - public static void setInput(Configuration conf, String snapshotName, Path restoreDir) - throws IOException { - conf.set(SNAPSHOT_NAME_KEY, snapshotName); - - Path rootDir = FSUtils.getRootDir(conf); - FileSystem fs = rootDir.getFileSystem(conf); - - restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); - - // TODO: restore from record readers to parallelize. - RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); - - conf.set(RESTORE_DIR_KEY, restoreDir.toString()); - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java deleted file mode 100644 index 13c7c67..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSplit.java +++ /dev/null @@ -1,395 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Arrays; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.mapreduce.InputSplit; - -/** - * A table split corresponds to a key range (low, high) and an optional scanner. - * All references to row below refer to the key of the row. - */ [email protected] -public class TableSplit extends InputSplit -implements Writable, Comparable<TableSplit> { - /** @deprecated LOG variable would be made private. fix in hbase 3.0 */ - @Deprecated - public static final Log LOG = LogFactory.getLog(TableSplit.class); - - // should be < 0 (@see #readFields(DataInput)) - // version 1 supports Scan data member - enum Version { - UNVERSIONED(0), - // Initial number we put on TableSplit when we introduced versioning. - INITIAL(-1), - // Added an encoded region name field for easier identification of split -> region - WITH_ENCODED_REGION_NAME(-2); - - final int code; - static final Version[] byCode; - static { - byCode = Version.values(); - for (int i = 0; i < byCode.length; i++) { - if (byCode[i].code != -1 * i) { - throw new AssertionError("Values in this enum should be descending by one"); - } - } - } - - Version(int code) { - this.code = code; - } - - boolean atLeast(Version other) { - return code <= other.code; - } - - static Version fromCode(int code) { - return byCode[code * -1]; - } - } - - private static final Version VERSION = Version.WITH_ENCODED_REGION_NAME; - private TableName tableName; - private byte [] startRow; - private byte [] endRow; - private String regionLocation; - private String encodedRegionName = ""; - private String scan = ""; // stores the serialized form of the Scan - private long length; // Contains estimation of region size in bytes - - /** Default constructor. */ - public TableSplit() { - this((TableName)null, null, HConstants.EMPTY_BYTE_ARRAY, - HConstants.EMPTY_BYTE_ARRAY, ""); - } - - /** - * Creates a new instance while assigning all variables. - * Length of region is set to 0 - * Encoded name of the region is set to blank - * - * @param tableName The name of the current table. - * @param scan The scan associated with this split. - * @param startRow The start row of the split. - * @param endRow The end row of the split. - * @param location The location of the region. - */ - public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow, - final String location) { - this(tableName, scan, startRow, endRow, location, 0L); - } - - /** - * Creates a new instance while assigning all variables. - * Encoded name of region is set to blank - * - * @param tableName The name of the current table. - * @param scan The scan associated with this split. - * @param startRow The start row of the split. - * @param endRow The end row of the split. - * @param location The location of the region. - */ - public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow, - final String location, long length) { - this(tableName, scan, startRow, endRow, location, "", length); - } - - /** - * Creates a new instance while assigning all variables. - * - * @param tableName The name of the current table. - * @param scan The scan associated with this split. - * @param startRow The start row of the split. - * @param endRow The end row of the split. - * @param encodedRegionName The region ID. - * @param location The location of the region. - */ - public TableSplit(TableName tableName, Scan scan, byte [] startRow, byte [] endRow, - final String location, final String encodedRegionName, long length) { - this.tableName = tableName; - try { - this.scan = - (null == scan) ? "" : TableMapReduceUtil.convertScanToString(scan); - } catch (IOException e) { - LOG.warn("Failed to convert Scan to String", e); - } - this.startRow = startRow; - this.endRow = endRow; - this.regionLocation = location; - this.encodedRegionName = encodedRegionName; - this.length = length; - } - - /** - * Creates a new instance without a scanner. - * Length of region is set to 0 - * - * @param tableName The name of the current table. - * @param startRow The start row of the split. - * @param endRow The end row of the split. - * @param location The location of the region. - */ - public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, - final String location) { - this(tableName, null, startRow, endRow, location); - } - - /** - * Creates a new instance without a scanner. - * - * @param tableName The name of the current table. - * @param startRow The start row of the split. - * @param endRow The end row of the split. - * @param location The location of the region. - * @param length Size of region in bytes - */ - public TableSplit(TableName tableName, byte[] startRow, byte[] endRow, - final String location, long length) { - this(tableName, null, startRow, endRow, location, length); - } - - /** - * Returns a Scan object from the stored string representation. - * - * @return Returns a Scan object based on the stored scanner. - * @throws IOException - */ - public Scan getScan() throws IOException { - return TableMapReduceUtil.convertStringToScan(this.scan); - } - - /** - * Returns the table name converted to a byte array. - * @see #getTable() - * @return The table name. - */ - public byte [] getTableName() { - return tableName.getName(); - } - - /** - * Returns the table name. - * - * @return The table name. - */ - public TableName getTable() { - // It is ugly that usually to get a TableName, the method is called getTableName. We can't do - // that in here though because there was an existing getTableName in place already since - // deprecated. - return tableName; - } - - /** - * Returns the start row. - * - * @return The start row. - */ - public byte [] getStartRow() { - return startRow; - } - - /** - * Returns the end row. - * - * @return The end row. - */ - public byte [] getEndRow() { - return endRow; - } - - /** - * Returns the region location. - * - * @return The region's location. - */ - public String getRegionLocation() { - return regionLocation; - } - - /** - * Returns the region's location as an array. - * - * @return The array containing the region location. - * @see org.apache.hadoop.mapreduce.InputSplit#getLocations() - */ - @Override - public String[] getLocations() { - return new String[] {regionLocation}; - } - - /** - * Returns the region's encoded name. - * - * @return The region's encoded name. - */ - public String getEncodedRegionName() { - return encodedRegionName; - } - - /** - * Returns the length of the split. - * - * @return The length of the split. - * @see org.apache.hadoop.mapreduce.InputSplit#getLength() - */ - @Override - public long getLength() { - return length; - } - - /** - * Reads the values of each field. - * - * @param in The input to read from. - * @throws IOException When reading the input fails. - */ - @Override - public void readFields(DataInput in) throws IOException { - Version version = Version.UNVERSIONED; - // TableSplit was not versioned in the beginning. - // In order to introduce it now, we make use of the fact - // that tableName was written with Bytes.writeByteArray, - // which encodes the array length as a vint which is >= 0. - // Hence if the vint is >= 0 we have an old version and the vint - // encodes the length of tableName. - // If < 0 we just read the version and the next vint is the length. - // @see Bytes#readByteArray(DataInput) - int len = WritableUtils.readVInt(in); - if (len < 0) { - // what we just read was the version - version = Version.fromCode(len); - len = WritableUtils.readVInt(in); - } - byte[] tableNameBytes = new byte[len]; - in.readFully(tableNameBytes); - tableName = TableName.valueOf(tableNameBytes); - startRow = Bytes.readByteArray(in); - endRow = Bytes.readByteArray(in); - regionLocation = Bytes.toString(Bytes.readByteArray(in)); - if (version.atLeast(Version.INITIAL)) { - scan = Bytes.toString(Bytes.readByteArray(in)); - } - length = WritableUtils.readVLong(in); - if (version.atLeast(Version.WITH_ENCODED_REGION_NAME)) { - encodedRegionName = Bytes.toString(Bytes.readByteArray(in)); - } - } - - /** - * Writes the field values to the output. - * - * @param out The output to write to. - * @throws IOException When writing the values to the output fails. - */ - @Override - public void write(DataOutput out) throws IOException { - WritableUtils.writeVInt(out, VERSION.code); - Bytes.writeByteArray(out, tableName.getName()); - Bytes.writeByteArray(out, startRow); - Bytes.writeByteArray(out, endRow); - Bytes.writeByteArray(out, Bytes.toBytes(regionLocation)); - Bytes.writeByteArray(out, Bytes.toBytes(scan)); - WritableUtils.writeVLong(out, length); - Bytes.writeByteArray(out, Bytes.toBytes(encodedRegionName)); - } - - /** - * Returns the details about this instance as a string. - * - * @return The values of this instance as a string. - * @see java.lang.Object#toString() - */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("HBase table split("); - sb.append("table name: ").append(tableName); - // null scan input is represented by "" - String printScan = ""; - if (!scan.equals("")) { - try { - // get the real scan here in toString, not the Base64 string - printScan = TableMapReduceUtil.convertStringToScan(scan).toString(); - } - catch (IOException e) { - printScan = ""; - } - } - sb.append(", scan: ").append(printScan); - sb.append(", start row: ").append(Bytes.toStringBinary(startRow)); - sb.append(", end row: ").append(Bytes.toStringBinary(endRow)); - sb.append(", region location: ").append(regionLocation); - sb.append(", encoded region name: ").append(encodedRegionName); - sb.append(")"); - return sb.toString(); - } - - /** - * Compares this split against the given one. - * - * @param split The split to compare to. - * @return The result of the comparison. - * @see java.lang.Comparable#compareTo(java.lang.Object) - */ - @Override - public int compareTo(TableSplit split) { - // If The table name of the two splits is the same then compare start row - // otherwise compare based on table names - int tableNameComparison = - getTable().compareTo(split.getTable()); - return tableNameComparison != 0 ? tableNameComparison : Bytes.compareTo( - getStartRow(), split.getStartRow()); - } - - @Override - public boolean equals(Object o) { - if (o == null || !(o instanceof TableSplit)) { - return false; - } - return tableName.equals(((TableSplit)o).tableName) && - Bytes.equals(startRow, ((TableSplit)o).startRow) && - Bytes.equals(endRow, ((TableSplit)o).endRow) && - regionLocation.equals(((TableSplit)o).regionLocation); - } - - @Override - public int hashCode() { - int result = tableName != null ? tableName.hashCode() : 0; - result = 31 * result + (scan != null ? scan.hashCode() : 0); - result = 31 * result + (startRow != null ? Arrays.hashCode(startRow) : 0); - result = 31 * result + (endRow != null ? Arrays.hashCode(endRow) : 0); - result = 31 * result + (regionLocation != null ? regionLocation.hashCode() : 0); - result = 31 * result + (encodedRegionName != null ? encodedRegionName.hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java deleted file mode 100644 index 84324e2..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ArrayBackedTag; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.TagType; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.security.visibility.InvalidLabelException; -import org.apache.hadoop.hbase.util.Base64; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Reducer; -import org.apache.hadoop.util.StringUtils; - -/** - * Emits Sorted KeyValues. Parse the passed text and creates KeyValues. Sorts them before emit. - * @see HFileOutputFormat2 - * @see KeyValueSortReducer - * @see PutSortReducer - */ [email protected] -public class TextSortReducer extends - Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue> { - - /** Timestamp for all inserted rows */ - private long ts; - - /** Column seperator */ - private String separator; - - /** Should skip bad lines */ - private boolean skipBadLines; - - private Counter badLineCount; - - private ImportTsv.TsvParser parser; - - /** Cell visibility expr **/ - private String cellVisibilityExpr; - - /** Cell TTL */ - private long ttl; - - private CellCreator kvCreator; - - public long getTs() { - return ts; - } - - public boolean getSkipBadLines() { - return skipBadLines; - } - - public Counter getBadLineCount() { - return badLineCount; - } - - public void incrementBadLineCount(int count) { - this.badLineCount.increment(count); - } - - /** - * Handles initializing this class with objects specific to it (i.e., the parser). - * Common initialization that might be leveraged by a subsclass is done in - * <code>doSetup</code>. Hence a subclass may choose to override this method - * and call <code>doSetup</code> as well before handling it's own custom params. - * - * @param context - */ - @Override - protected void setup(Context context) { - Configuration conf = context.getConfiguration(); - doSetup(context, conf); - - parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); - if (parser.getRowKeyColumnIndex() == -1) { - throw new RuntimeException("No row key column specified"); - } - this.kvCreator = new CellCreator(conf); - } - - /** - * Handles common parameter initialization that a subclass might want to leverage. - * @param context - * @param conf - */ - protected void doSetup(Context context, Configuration conf) { - // If a custom separator has been used, - // decode it back from Base64 encoding. - separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); - if (separator == null) { - separator = ImportTsv.DEFAULT_SEPARATOR; - } else { - separator = new String(Base64.decode(separator)); - } - - // Should never get 0 as we are setting this to a valid value in job configuration. - ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); - - skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); - badLineCount = context.getCounter("ImportTsv", "Bad Lines"); - } - - @Override - protected void reduce( - ImmutableBytesWritable rowKey, - java.lang.Iterable<Text> lines, - Reducer<ImmutableBytesWritable, Text, - ImmutableBytesWritable, KeyValue>.Context context) - throws java.io.IOException, InterruptedException - { - // although reduce() is called per-row, handle pathological case - long threshold = context.getConfiguration().getLong( - "reducer.row.threshold", 1L * (1<<30)); - Iterator<Text> iter = lines.iterator(); - while (iter.hasNext()) { - Set<KeyValue> kvs = new TreeSet<>(CellComparator.COMPARATOR); - long curSize = 0; - // stop at the end or the RAM threshold - while (iter.hasNext() && curSize < threshold) { - Text line = iter.next(); - byte[] lineBytes = line.getBytes(); - try { - ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, line.getLength()); - // Retrieve timestamp if exists - ts = parsed.getTimestamp(ts); - cellVisibilityExpr = parsed.getCellVisibility(); - ttl = parsed.getCellTTL(); - - // create tags for the parsed line - List<Tag> tags = new ArrayList<>(); - if (cellVisibilityExpr != null) { - tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags( - cellVisibilityExpr)); - } - // Add TTL directly to the KV so we can vary them when packing more than one KV - // into puts - if (ttl > 0) { - tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); - } - for (int i = 0; i < parsed.getColumnCount(); i++) { - if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() - || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() - || i == parser.getCellTTLColumnIndex()) { - continue; - } - // Creating the KV which needs to be directly written to HFiles. Using the Facade - // KVCreator for creation of kvs. - Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), - parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, - parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes, - parsed.getColumnOffset(i), parsed.getColumnLength(i), tags); - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - kvs.add(kv); - curSize += kv.heapSize(); - } - } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException - | InvalidLabelException badLine) { - if (skipBadLines) { - System.err.println("Bad line." + badLine.getMessage()); - incrementBadLineCount(1); - continue; - } - throw new IOException(badLine); - } - } - context.setStatus("Read " + kvs.size() + " entries of " + kvs.getClass() - + "(" + StringUtils.humanReadableInt(curSize) + ")"); - int index = 0; - for (KeyValue kv : kvs) { - context.write(rowKey, kv); - if (++index > 0 && index % 100 == 0) - context.setStatus("Wrote " + index + " key values."); - } - - // if we have more entries to process - if (iter.hasNext()) { - // force flush because we cannot guarantee intra-row sorted order - context.write(null, null); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java deleted file mode 100644 index a9d8e03..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ArrayBackedTag; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.TagType; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; -import org.apache.hadoop.hbase.security.visibility.CellVisibility; -import org.apache.hadoop.hbase.security.visibility.InvalidLabelException; -import org.apache.hadoop.hbase.util.Base64; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Mapper; - -/** - * Write table content out to files in hdfs. - */ [email protected] -public class TsvImporterMapper -extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> -{ - - /** Timestamp for all inserted rows */ - protected long ts; - - /** Column seperator */ - private String separator; - - /** Should skip bad lines */ - private boolean skipBadLines; - /** Should skip empty columns*/ - private boolean skipEmptyColumns; - private Counter badLineCount; - private boolean logBadLines; - - protected ImportTsv.TsvParser parser; - - protected Configuration conf; - - protected String cellVisibilityExpr; - - protected long ttl; - - protected CellCreator kvCreator; - - private String hfileOutPath; - - /** List of cell tags */ - private List<Tag> tags; - - public long getTs() { - return ts; - } - - public boolean getSkipBadLines() { - return skipBadLines; - } - - public Counter getBadLineCount() { - return badLineCount; - } - - public void incrementBadLineCount(int count) { - this.badLineCount.increment(count); - } - - /** - * Handles initializing this class with objects specific to it (i.e., the parser). - * Common initialization that might be leveraged by a subsclass is done in - * <code>doSetup</code>. Hence a subclass may choose to override this method - * and call <code>doSetup</code> as well before handling it's own custom params. - * - * @param context - */ - @Override - protected void setup(Context context) { - doSetup(context); - - conf = context.getConfiguration(); - parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), - separator); - if (parser.getRowKeyColumnIndex() == -1) { - throw new RuntimeException("No row key column specified"); - } - this.kvCreator = new CellCreator(conf); - tags = new ArrayList<>(); - } - - /** - * Handles common parameter initialization that a subclass might want to leverage. - * @param context - */ - protected void doSetup(Context context) { - Configuration conf = context.getConfiguration(); - - // If a custom separator has been used, - // decode it back from Base64 encoding. - separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); - if (separator == null) { - separator = ImportTsv.DEFAULT_SEPARATOR; - } else { - separator = new String(Base64.decode(separator)); - } - // Should never get 0 as we are setting this to a valid value in job - // configuration. - ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); - - skipEmptyColumns = context.getConfiguration().getBoolean( - ImportTsv.SKIP_EMPTY_COLUMNS, false); - skipBadLines = context.getConfiguration().getBoolean( - ImportTsv.SKIP_LINES_CONF_KEY, true); - badLineCount = context.getCounter("ImportTsv", "Bad Lines"); - logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false); - hfileOutPath = conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY); - } - - /** - * Convert a line of TSV text into an HBase table row. - */ - @Override - public void map(LongWritable offset, Text value, - Context context) - throws IOException { - byte[] lineBytes = value.getBytes(); - - try { - ImportTsv.TsvParser.ParsedLine parsed = parser.parse( - lineBytes, value.getLength()); - ImmutableBytesWritable rowKey = - new ImmutableBytesWritable(lineBytes, - parsed.getRowKeyOffset(), - parsed.getRowKeyLength()); - // Retrieve timestamp if exists - ts = parsed.getTimestamp(ts); - cellVisibilityExpr = parsed.getCellVisibility(); - ttl = parsed.getCellTTL(); - - // create tags for the parsed line - if (hfileOutPath != null) { - tags.clear(); - if (cellVisibilityExpr != null) { - tags.addAll(kvCreator.getVisibilityExpressionResolver().createVisibilityExpTags( - cellVisibilityExpr)); - } - // Add TTL directly to the KV so we can vary them when packing more than one KV - // into puts - if (ttl > 0) { - tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl))); - } - } - Put put = new Put(rowKey.copyBytes()); - for (int i = 0; i < parsed.getColumnCount(); i++) { - if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex() - || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex() - || i == parser.getCellTTLColumnIndex() || (skipEmptyColumns - && parsed.getColumnLength(i) == 0)) { - continue; - } - populatePut(lineBytes, parsed, put, i); - } - context.write(rowKey, put); - } catch (ImportTsv.TsvParser.BadTsvLineException | IllegalArgumentException - | InvalidLabelException badLine) { - if (logBadLines) { - System.err.println(value); - } - System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); - if (skipBadLines) { - incrementBadLineCount(1); - return; - } - throw new IOException(badLine); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - protected void populatePut(byte[] lineBytes, ImportTsv.TsvParser.ParsedLine parsed, Put put, - int i) throws BadTsvLineException, IOException { - Cell cell = null; - if (hfileOutPath == null) { - cell = new KeyValue(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), - parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, - parser.getQualifier(i).length, ts, KeyValue.Type.Put, lineBytes, - parsed.getColumnOffset(i), parsed.getColumnLength(i)); - if (cellVisibilityExpr != null) { - // We won't be validating the expression here. The Visibility CP will do - // the validation - put.setCellVisibility(new CellVisibility(cellVisibilityExpr)); - } - if (ttl > 0) { - put.setTTL(ttl); - } - } else { - // Creating the KV which needs to be directly written to HFiles. Using the Facade - // KVCreator for creation of kvs. - cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), - parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0, - parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i), - parsed.getColumnLength(i), tags); - } - put.add(cell); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java deleted file mode 100644 index 581f0d0..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterTextMapper.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Base64; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; - -/** - * Write table content out to map output files. - */ [email protected] -public class TsvImporterTextMapper -extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text> -{ - - /** Column seperator */ - private String separator; - - /** Should skip bad lines */ - private boolean skipBadLines; - private Counter badLineCount; - private boolean logBadLines; - - private ImportTsv.TsvParser parser; - - public boolean getSkipBadLines() { - return skipBadLines; - } - - public Counter getBadLineCount() { - return badLineCount; - } - - public void incrementBadLineCount(int count) { - this.badLineCount.increment(count); - } - - /** - * Handles initializing this class with objects specific to it (i.e., the parser). - * Common initialization that might be leveraged by a subsclass is done in - * <code>doSetup</code>. Hence a subclass may choose to override this method - * and call <code>doSetup</code> as well before handling it's own custom params. - * - * @param context - */ - @Override - protected void setup(Context context) { - doSetup(context); - - Configuration conf = context.getConfiguration(); - - parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); - if (parser.getRowKeyColumnIndex() == -1) { - throw new RuntimeException("No row key column specified"); - } - } - - /** - * Handles common parameter initialization that a subclass might want to leverage. - * @param context - */ - protected void doSetup(Context context) { - Configuration conf = context.getConfiguration(); - - // If a custom separator has been used, - // decode it back from Base64 encoding. - separator = conf.get(ImportTsv.SEPARATOR_CONF_KEY); - if (separator == null) { - separator = ImportTsv.DEFAULT_SEPARATOR; - } else { - separator = new String(Base64.decode(separator)); - } - - skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); - logBadLines = context.getConfiguration().getBoolean(ImportTsv.LOG_BAD_LINES_CONF_KEY, false); - badLineCount = context.getCounter("ImportTsv", "Bad Lines"); - } - - /** - * Convert a line of TSV text into an HBase table row. - */ - @Override - public void map(LongWritable offset, Text value, Context context) throws IOException { - try { - Pair<Integer,Integer> rowKeyOffests = parser.parseRowKey(value.getBytes(), value.getLength()); - ImmutableBytesWritable rowKey = new ImmutableBytesWritable( - value.getBytes(), rowKeyOffests.getFirst(), rowKeyOffests.getSecond()); - context.write(rowKey, value); - } catch (ImportTsv.TsvParser.BadTsvLineException|IllegalArgumentException badLine) { - if (logBadLines) { - System.err.println(value); - } - System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); - if (skipBadLines) { - incrementBadLineCount(1); - return; - } - throw new IOException(badLine); - } catch (InterruptedException e) { - e.printStackTrace(); - Thread.currentThread().interrupt(); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java deleted file mode 100644 index a83a88f..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/VisibilityExpressionResolver.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.hbase.Tag; - -/** - * Interface to convert visibility expressions into Tags for storing along with Cells in HFiles. - */ [email protected] -public interface VisibilityExpressionResolver extends Configurable { - - /** - * Giving a chance for the initialization. - */ - void init(); - - /** - * Convert visibility expression into tags to be serialized. - * @param visExpression the label expression - * @return The list of tags corresponds to the visibility expression. These tags will be stored - * along with the Cells. - */ - List<Tag> createVisibilityExpTags(String visExpression) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java deleted file mode 100644 index 8b4e967..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ /dev/null @@ -1,344 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.EOFException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.util.StringUtils; - -/** - * Simple {@link InputFormat} for {@link org.apache.hadoop.hbase.wal.WAL} files. - */ [email protected] -public class WALInputFormat extends InputFormat<WALKey, WALEdit> { - private static final Log LOG = LogFactory.getLog(WALInputFormat.class); - - public static final String START_TIME_KEY = "wal.start.time"; - public static final String END_TIME_KEY = "wal.end.time"; - - /** - * {@link InputSplit} for {@link WAL} files. Each split represent - * exactly one log file. - */ - static class WALSplit extends InputSplit implements Writable { - private String logFileName; - private long fileSize; - private long startTime; - private long endTime; - - /** for serialization */ - public WALSplit() {} - - /** - * Represent an WALSplit, i.e. a single WAL file. - * Start- and EndTime are managed by the split, so that WAL files can be - * filtered before WALEdits are passed to the mapper(s). - * @param logFileName - * @param fileSize - * @param startTime - * @param endTime - */ - public WALSplit(String logFileName, long fileSize, long startTime, long endTime) { - this.logFileName = logFileName; - this.fileSize = fileSize; - this.startTime = startTime; - this.endTime = endTime; - } - - @Override - public long getLength() throws IOException, InterruptedException { - return fileSize; - } - - @Override - public String[] getLocations() throws IOException, InterruptedException { - // TODO: Find the data node with the most blocks for this WAL? - return new String[] {}; - } - - public String getLogFileName() { - return logFileName; - } - - public long getStartTime() { - return startTime; - } - - public long getEndTime() { - return endTime; - } - - @Override - public void readFields(DataInput in) throws IOException { - logFileName = in.readUTF(); - fileSize = in.readLong(); - startTime = in.readLong(); - endTime = in.readLong(); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(logFileName); - out.writeLong(fileSize); - out.writeLong(startTime); - out.writeLong(endTime); - } - - @Override - public String toString() { - return logFileName + " (" + startTime + ":" + endTime + ") length:" + fileSize; - } - } - - /** - * {@link RecordReader} for an {@link WAL} file. - * Implementation shared with deprecated HLogInputFormat. - */ - static abstract class WALRecordReader<K extends WALKey> extends RecordReader<K, WALEdit> { - private Reader reader = null; - // visible until we can remove the deprecated HLogInputFormat - Entry currentEntry = new Entry(); - private long startTime; - private long endTime; - private Configuration conf; - private Path logFile; - private long currentPos; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - WALSplit hsplit = (WALSplit)split; - logFile = new Path(hsplit.getLogFileName()); - conf = context.getConfiguration(); - LOG.info("Opening reader for "+split); - openReader(logFile); - this.startTime = hsplit.getStartTime(); - this.endTime = hsplit.getEndTime(); - } - - private void openReader(Path path) throws IOException - { - closeReader(); - reader = AbstractFSWALProvider.openReader(path, conf); - seek(); - setCurrentPath(path); - } - - private void setCurrentPath(Path path) { - this.logFile = path; - } - - private void closeReader() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - } - - private void seek() throws IOException { - if (currentPos != 0) { - reader.seek(currentPos); - } - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (reader == null) return false; - this.currentPos = reader.getPosition(); - Entry temp; - long i = -1; - try { - do { - // skip older entries - try { - temp = reader.next(currentEntry); - i++; - } catch (EOFException x) { - LOG.warn("Corrupted entry detected. Ignoring the rest of the file." - + " (This is normal when a RegionServer crashed.)"); - return false; - } - } while (temp != null && temp.getKey().getWriteTime() < startTime); - - if (temp == null) { - if (i > 0) LOG.info("Skipped " + i + " entries."); - LOG.info("Reached end of file."); - return false; - } else if (i > 0) { - LOG.info("Skipped " + i + " entries, until ts: " + temp.getKey().getWriteTime() + "."); - } - boolean res = temp.getKey().getWriteTime() <= endTime; - if (!res) { - LOG.info("Reached ts: " + temp.getKey().getWriteTime() - + " ignoring the rest of the file."); - } - return res; - } catch (IOException e) { - Path archivedLog = AbstractFSWALProvider.getArchivedLogPath(logFile, conf); - if (logFile != archivedLog) { - openReader(archivedLog); - // Try call again in recursion - return nextKeyValue(); - } else { - throw e; - } - } - } - - @Override - public WALEdit getCurrentValue() throws IOException, InterruptedException { - return currentEntry.getEdit(); - } - - @Override - public float getProgress() throws IOException, InterruptedException { - // N/A depends on total number of entries, which is unknown - return 0; - } - - @Override - public void close() throws IOException { - LOG.info("Closing reader"); - if (reader != null) this.reader.close(); - } - } - - /** - * handler for non-deprecated WALKey version. fold into WALRecordReader once we no longer - * need to support HLogInputFormat. - */ - static class WALKeyRecordReader extends WALRecordReader<WALKey> { - @Override - public WALKey getCurrentKey() throws IOException, InterruptedException { - return currentEntry.getKey(); - } - } - - @Override - public List<InputSplit> getSplits(JobContext context) throws IOException, - InterruptedException { - return getSplits(context, START_TIME_KEY, END_TIME_KEY); - } - - /** - * implementation shared with deprecated HLogInputFormat - */ - List<InputSplit> getSplits(final JobContext context, final String startKey, final String endKey) - throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - boolean ignoreMissing = conf.getBoolean(WALPlayer.IGNORE_MISSING_FILES, false); - Path[] inputPaths = getInputPaths(conf); - long startTime = conf.getLong(startKey, Long.MIN_VALUE); - long endTime = conf.getLong(endKey, Long.MAX_VALUE); - - List<FileStatus> allFiles = new ArrayList<FileStatus>(); - for(Path inputPath: inputPaths){ - FileSystem fs = inputPath.getFileSystem(conf); - try { - List<FileStatus> files = getFiles(fs, inputPath, startTime, endTime); - allFiles.addAll(files); - } catch (FileNotFoundException e) { - if (ignoreMissing) { - LOG.warn("File "+ inputPath +" is missing. Skipping it."); - continue; - } - throw e; - } - } - List<InputSplit> splits = new ArrayList<InputSplit>(allFiles.size()); - for (FileStatus file : allFiles) { - splits.add(new WALSplit(file.getPath().toString(), file.getLen(), startTime, endTime)); - } - return splits; - } - - private Path[] getInputPaths(Configuration conf) { - String inpDirs = conf.get(FileInputFormat.INPUT_DIR); - return StringUtils.stringToPath( - inpDirs.split(conf.get(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ","))); - } - - private List<FileStatus> getFiles(FileSystem fs, Path dir, long startTime, long endTime) - throws IOException { - List<FileStatus> result = new ArrayList<>(); - LOG.debug("Scanning " + dir.toString() + " for WAL files"); - - RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(dir); - if (!iter.hasNext()) return Collections.emptyList(); - while (iter.hasNext()) { - LocatedFileStatus file = iter.next(); - if (file.isDirectory()) { - // recurse into sub directories - result.addAll(getFiles(fs, file.getPath(), startTime, endTime)); - } else { - String name = file.getPath().toString(); - int idx = name.lastIndexOf('.'); - if (idx > 0) { - try { - long fileStartTime = Long.parseLong(name.substring(idx+1)); - if (fileStartTime <= endTime) { - LOG.info("Found: " + file); - result.add(file); - } - } catch (NumberFormatException x) { - idx = 0; - } - } - if (idx == 0) { - LOG.warn("File " + name + " does not appear to be an WAL file. Skipping..."); - } - } - } - return result; - } - - @Override - public RecordReader<WALKey, WALEdit> createRecordReader(InputSplit split, - TaskAttemptContext context) throws IOException, InterruptedException { - return new WALKeyRecordReader(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java deleted file mode 100644 index b1e655c..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java +++ /dev/null @@ -1,384 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapreduce; - -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; - -/** - * A tool to replay WAL files as a M/R job. - * The WAL can be replayed for a set of tables or all tables, - * and a time range can be provided (in milliseconds). - * The WAL is filtered to the passed set of tables and the output - * can optionally be mapped to another set of tables. - * - * WAL replay can also generate HFiles for later bulk importing, - * in that case the WAL is replayed for a single table only. - */ [email protected] -public class WALPlayer extends Configured implements Tool { - private static final Log LOG = LogFactory.getLog(WALPlayer.class); - final static String NAME = "WALPlayer"; - public final static String BULK_OUTPUT_CONF_KEY = "wal.bulk.output"; - public final static String TABLES_KEY = "wal.input.tables"; - public final static String TABLE_MAP_KEY = "wal.input.tablesmap"; - public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator"; - public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files"; - - - // This relies on Hadoop Configuration to handle warning about deprecated configs and - // to set the correct non-deprecated configs when an old one shows up. - static { - Configuration.addDeprecation("hlog.bulk.output", BULK_OUTPUT_CONF_KEY); - Configuration.addDeprecation("hlog.input.tables", TABLES_KEY); - Configuration.addDeprecation("hlog.input.tablesmap", TABLE_MAP_KEY); - } - - private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; - - public WALPlayer(){ - } - - protected WALPlayer(final Configuration c) { - super(c); - } - - /** - * A mapper that just writes out KeyValues. - * This one can be used together with {@link KeyValueSortReducer} - */ - static class WALKeyValueMapper - extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue> { - private byte[] table; - - @Override - public void map(WALKey key, WALEdit value, - Context context) - throws IOException { - try { - // skip all other tables - if (Bytes.equals(table, key.getTablename().getName())) { - for (Cell cell : value.getCells()) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - if (WALEdit.isMetaEditFamily(kv)) { - continue; - } - context.write(new ImmutableBytesWritable(CellUtil.cloneRow(kv)), kv); - } - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - @Override - public void setup(Context context) throws IOException { - // only a single table is supported when HFiles are generated with HFileOutputFormat - String[] tables = context.getConfiguration().getStrings(TABLES_KEY); - if (tables == null || tables.length != 1) { - // this can only happen when WALMapper is used directly by a class other than WALPlayer - throw new IOException("Exactly one table must be specified for bulk HFile case."); - } - table = Bytes.toBytes(tables[0]); - - } - - } - - /** - * A mapper that writes out {@link Mutation} to be directly applied to - * a running HBase instance. - */ - protected static class WALMapper - extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> { - private Map<TableName, TableName> tables = new TreeMap<>(); - - @Override - public void map(WALKey key, WALEdit value, Context context) - throws IOException { - try { - if (tables.isEmpty() || tables.containsKey(key.getTablename())) { - TableName targetTable = tables.isEmpty() ? - key.getTablename() : - tables.get(key.getTablename()); - ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName()); - Put put = null; - Delete del = null; - Cell lastCell = null; - for (Cell cell : value.getCells()) { - // filtering WAL meta entries - if (WALEdit.isMetaEditFamily(cell)) { - continue; - } - - // Allow a subclass filter out this cell. - if (filter(context, cell)) { - // A WALEdit may contain multiple operations (HBASE-3584) and/or - // multiple rows (HBASE-5229). - // Aggregate as much as possible into a single Put/Delete - // operation before writing to the context. - if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte() - || !CellUtil.matchingRow(lastCell, cell)) { - // row or type changed, write out aggregate KVs. - if (put != null) { - context.write(tableOut, put); - } - if (del != null) { - context.write(tableOut, del); - } - if (CellUtil.isDelete(cell)) { - del = new Delete(CellUtil.cloneRow(cell)); - } else { - put = new Put(CellUtil.cloneRow(cell)); - } - } - if (CellUtil.isDelete(cell)) { - del.add(cell); - } else { - put.add(cell); - } - } - lastCell = cell; - } - // write residual KVs - if (put != null) { - context.write(tableOut, put); - } - if (del != null) { - context.write(tableOut, del); - } - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - protected boolean filter(Context context, final Cell cell) { - return true; - } - - @Override - protected void - cleanup(Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation>.Context context) - throws IOException, InterruptedException { - super.cleanup(context); - } - - @Override - public void setup(Context context) throws IOException { - String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY); - String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY); - if (tableMap == null) { - tableMap = tablesToUse; - } - if (tablesToUse == null) { - // Then user wants all tables. - } else if (tablesToUse.length != tableMap.length) { - // this can only happen when WALMapper is used directly by a class other than WALPlayer - throw new IOException("Incorrect table mapping specified ."); - } - int i = 0; - if (tablesToUse != null) { - for (String table : tablesToUse) { - tables.put(TableName.valueOf(table), - TableName.valueOf(tableMap[i++])); - } - } - } - } - - void setupTime(Configuration conf, String option) throws IOException { - String val = conf.get(option); - if (null == val) { - return; - } - long ms; - try { - // first try to parse in user friendly form - ms = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS").parse(val).getTime(); - } catch (ParseException pe) { - try { - // then see if just a number of ms's was specified - ms = Long.parseLong(val); - } catch (NumberFormatException nfe) { - throw new IOException(option - + " must be specified either in the form 2001-02-20T16:35:06.99 " - + "or as number of milliseconds"); - } - } - conf.setLong(option, ms); - } - - /** - * Sets up the actual job. - * - * @param args The command line parameters. - * @return The newly created job. - * @throws IOException When setting up the job fails. - */ - public Job createSubmittableJob(String[] args) throws IOException { - Configuration conf = getConf(); - setupTime(conf, WALInputFormat.START_TIME_KEY); - setupTime(conf, WALInputFormat.END_TIME_KEY); - String inputDirs = args[0]; - String[] tables = args[1].split(","); - String[] tableMap; - if (args.length > 2) { - tableMap = args[2].split(","); - if (tableMap.length != tables.length) { - throw new IOException("The same number of tables and mapping must be provided."); - } - } else { - // if not mapping is specified map each table to itself - tableMap = tables; - } - conf.setStrings(TABLES_KEY, tables); - conf.setStrings(TABLE_MAP_KEY, tableMap); - conf.set(FileInputFormat.INPUT_DIR, inputDirs); - Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis())); - job.setJarByClass(WALPlayer.class); - - job.setInputFormatClass(WALInputFormat.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - - String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); - if (hfileOutPath != null) { - LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); - - // the bulk HFile case - if (tables.length != 1) { - throw new IOException("Exactly one table must be specified for the bulk export option"); - } - TableName tableName = TableName.valueOf(tables[0]); - job.setMapperClass(WALKeyValueMapper.class); - job.setReducerClass(KeyValueSortReducer.class); - Path outputDir = new Path(hfileOutPath); - FileOutputFormat.setOutputPath(job, outputDir); - job.setMapOutputValueClass(KeyValue.class); - try (Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(tableName); - RegionLocator regionLocator = conn.getRegionLocator(tableName)) { - HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator); - } - TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), - org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions.class); - } else { - // output to live cluster - job.setMapperClass(WALMapper.class); - job.setOutputFormatClass(MultiTableOutputFormat.class); - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - // No reducers. - job.setNumReduceTasks(0); - } - String codecCls = WALCellCodec.getWALCellCodecClass(conf); - try { - TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls)); - } catch (Exception e) { - throw new IOException("Cannot determine wal codec class " + codecCls, e); - } - return job; - } - - - /** - * Print usage - * @param errorMsg Error message. Can be null. - */ - private void usage(final String errorMsg) { - if (errorMsg != null && errorMsg.length() > 0) { - System.err.println("ERROR: " + errorMsg); - } - System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]"); - System.err.println("Read all WAL entries for <tables>."); - System.err.println("If no tables (\"\") are specific, all tables are imported."); - System.err.println("(Careful, even hbase:meta entries will be imported"+ - " in that case.)"); - System.err.println("Otherwise <tables> is a comma separated list of tables.\n"); - System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>."); - System.err.println("<tableMapping> is a command separated list of targettables."); - System.err.println("If specified, each table in <tables> must have a mapping.\n"); - System.err.println("By default " + NAME + " will load data directly into HBase."); - System.err.println("To generate HFiles for a bulk data load instead, pass the option:"); - System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); - System.err.println(" (Only one table can be specified, and no mapping is allowed!)"); - System.err.println("Other options: (specify time range to WAL edit to consider)"); - System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]"); - System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]"); - System.err.println(" -D " + JOB_NAME_CONF_KEY - + "=jobName - use the specified mapreduce job name for the wal player"); - System.err.println("For performance also consider the following options:\n" - + " -Dmapreduce.map.speculative=false\n" - + " -Dmapreduce.reduce.speculative=false"); - } - - /** - * Main entry point. - * - * @param args The command line parameters. - * @throws Exception When running the job fails. - */ - public static void main(String[] args) throws Exception { - int ret = ToolRunner.run(new WALPlayer(HBaseConfiguration.create()), args); - System.exit(ret); - } - - @Override - public int run(String[] args) throws Exception { - if (args.length < 2) { - usage("Wrong number of arguments: " + args.length); - System.exit(-1); - } - Job job = createSubmittableJob(args); - return job.waitForCompletion(true) ? 0 : 1; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/664b6be0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java deleted file mode 100644 index 199e168..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** -Provides HBase <a href="http://wiki.apache.org/hadoop/HadoopMapReduce">MapReduce</a> -Input/OutputFormats, a table indexing MapReduce job, and utility methods. - -<p>See <a href="http://hbase.apache.org/book.html#mapreduce">HBase and MapReduce</a> -in the HBase Reference Guide for mapreduce over hbase documentation. -*/ -package org.apache.hadoop.hbase.mapreduce;
