Repository: crunch Updated Branches: refs/heads/master 047d8fd36 -> d65ba78e6
CRUNCH-644 Supply preferred node for HFile writes Designate the preferred HDFS data node when creating HFiles for bulk load to improve data locality of the created HFiles. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d65ba78e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d65ba78e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d65ba78e Branch: refs/heads/master Commit: d65ba78e6da4a72a0d836aa2b5b2dd32cc72eed9 Parents: 047d8fd Author: Gabriel Reid <[email protected]> Authored: Thu Apr 27 14:52:16 2017 +0200 Committer: Gabriel Reid <[email protected]> Committed: Mon May 8 08:41:09 2017 +0200 ---------------------------------------------------------------------- .../io/hbase/RegionLocationTableTest.java | 137 ++++++++++++++++++ .../io/hbase/HFileOutputFormatForCrunch.java | 74 +++++++--- .../org/apache/crunch/io/hbase/HFileUtils.java | 21 ++- .../crunch/io/hbase/RegionLocationTable.java | 143 +++++++++++++++++++ 4 files changed, 358 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/d65ba78e/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/RegionLocationTableTest.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/RegionLocationTableTest.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/RegionLocationTableTest.java new file mode 100644 index 0000000..fa500bf --- /dev/null +++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/RegionLocationTableTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.io.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.junit.Before; +import org.junit.Test; + +public class RegionLocationTableTest { + + private static final String TABLE_NAME = "DATA_TABLE"; + private RegionLocationTable regionLocationTable; + + @Before + public void setUp() { + regionLocationTable = RegionLocationTable.create(TABLE_NAME, + ImmutableList.of( + location(null, new byte[] { 10 }, "serverA"), + location(new byte[] { 10 }, new byte[] { 20 }, "serverB"), + location(new byte[] { 20 }, new byte[] { 30 }, "serverC"), + location(new byte[] { 30 }, null, "serverD"))); + } + + @Test + public void testLookupRowInFirstRegion() { + assertEquals( + InetSocketAddress.createUnresolved("serverA", 0), + regionLocationTable.getPreferredNodeForRow(new byte[] { 5 })); + } + + @Test + public void testLookupRowInNonBoundaryRegion() { + assertEquals( + InetSocketAddress.createUnresolved("serverC", 0), + regionLocationTable.getPreferredNodeForRow(new byte[] { 25 })); + } + + @Test + public void testLookupRowInLastRegion() { + assertEquals( + InetSocketAddress.createUnresolved("serverD", 0), + regionLocationTable.getPreferredNodeForRow(new byte[] { 35 })); + } + + @Test + public void testLookupRowOnRegionBoundary() { + assertEquals( + InetSocketAddress.createUnresolved("serverB", 0), + regionLocationTable.getPreferredNodeForRow(new byte[] { 10 })); + } + + @Test + public void testEmpty() { + RegionLocationTable emptyTable = RegionLocationTable.create(TABLE_NAME, + ImmutableList.<HRegionLocation>of()); + + assertNull( + emptyTable.getPreferredNodeForRow(new byte[] { 10 })); + } + + @Test + public void testSerializationRoundTrip() throws IOException { + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(byteOutputStream); + + regionLocationTable.serialize(dataOutput); + + ByteArrayInputStream byteInputStream = new ByteArrayInputStream(byteOutputStream.toByteArray()); + DataInput dataInput = new DataInputStream(byteInputStream); + + RegionLocationTable deserialized = RegionLocationTable.deserialize(dataInput); + + // Just a basic test to make sure it works as before + assertEquals( + InetSocketAddress.createUnresolved("serverA", 0), + deserialized.getPreferredNodeForRow(new byte[] { 5 })); + } + + @Test + public void testSerializationRoundTrip_EmptyTable() throws IOException { + ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(byteOutputStream); + + RegionLocationTable emptyTable = RegionLocationTable.create(TABLE_NAME, + ImmutableList.<HRegionLocation>of()); + + emptyTable.serialize(dataOutput); + + ByteArrayInputStream byteInputStream = new ByteArrayInputStream(byteOutputStream.toByteArray()); + DataInput dataInput = new DataInputStream(byteInputStream); + + RegionLocationTable deserialized = RegionLocationTable.deserialize(dataInput); + + // Just a basic test to make sure it works as before + assertNull( + deserialized.getPreferredNodeForRow(new byte[] { 10 })); + } + + private static HRegionLocation location(byte[] startKey, byte[] endKey, String hostName) { + return new HRegionLocation( + new HRegionInfo(TableName.valueOf(TABLE_NAME), startKey, endKey), + ServerName.valueOf(hostName, 60020, System.currentTimeMillis())); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/d65ba78e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java index 0b6ae2f..50d5a0b 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileOutputFormatForCrunch.java @@ -19,17 +19,22 @@ */ package org.apache.crunch.io.hbase; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteStreams; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; @@ -47,6 +52,7 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; +import java.net.InetSocketAddress; /** * This is a thin wrapper of {@link HFile.Writer}. It only calls {@link HFile.Writer#append} @@ -71,8 +77,8 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, Cell> { public RecordWriter<Object, Cell> getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException { Path outputPath = getDefaultWorkFile(context, ""); - Configuration conf = context.getConfiguration(); - FileSystem fs = outputPath.getFileSystem(conf); + final Configuration conf = context.getConfiguration(); + FileSystem fs = new HFileSystem(outputPath.getFileSystem(conf)); final boolean compactionExclude = conf.getBoolean( COMPACTION_EXCLUDE_CONF_KEY, false); @@ -93,17 +99,26 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, Cell> { LOG.info("HColumnDescriptor: {}", hcol.toString()); Configuration noCacheConf = new Configuration(conf); noCacheConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - final StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(noCacheConf), fs) + final StoreFile.WriterBuilder writerBuilder = new StoreFile.WriterBuilder(conf, new CacheConfig(noCacheConf), fs) .withComparator(KeyValue.COMPARATOR) .withFileContext(getContext(hcol)) .withFilePath(outputPath) - .withBloomType(hcol.getBloomFilterType()) - .build(); + .withBloomType(hcol.getBloomFilterType()); return new RecordWriter<Object, Cell>() { + + StoreFile.Writer writer = null; + @Override public void write(Object row, Cell cell) throws IOException { + + if (writer == null) { + writer = writerBuilder + .withFavoredNodes(getPreferredNodes(conf, cell)) + .build(); + } + KeyValue copy = KeyValue.cloneAndAddTags(cell, ImmutableList.<Tag>of()); if (copy.getTimestamp() == HConstants.LATEST_TIMESTAMP) { copy.updateLatestStamp(now); @@ -114,21 +129,48 @@ public class HFileOutputFormatForCrunch extends FileOutputFormat<Object, Cell> { @Override public void close(TaskAttemptContext c) throws IOException { - writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, - Bytes.toBytes(System.currentTimeMillis())); - writer.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, - Bytes.toBytes(context.getTaskAttemptID().toString())); - writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, - Bytes.toBytes(true)); - writer.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, - Bytes.toBytes(compactionExclude)); - writer.appendFileInfo(StoreFile.TIMERANGE_KEY, - WritableUtils.toByteArray(trt)); - writer.close(); + if (writer != null) { + writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, + Bytes.toBytes(System.currentTimeMillis())); + writer.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, + Bytes.toBytes(context.getTaskAttemptID().toString())); + writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, + Bytes.toBytes(true)); + writer.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, + Bytes.toBytes(compactionExclude)); + writer.appendFileInfo(StoreFile.TIMERANGE_KEY, + WritableUtils.toByteArray(trt)); + writer.close(); + } } }; } + /** + * Returns the "preferred" node for the given cell, or null if no preferred node can be found. The "preferred" + * node for a cell is defined as the host where the region server is located that is hosting the region that will + * contain the given cell. + */ + private InetSocketAddress[] getPreferredNodes(Configuration conf, Cell cell) throws IOException { + String regionLocationFilePathStr = conf.get(RegionLocationTable.REGION_LOCATION_TABLE_PATH); + if (regionLocationFilePathStr != null) { + LOG.debug("Reading region location file from {}", regionLocationFilePathStr); + Path regionLocationPath = new Path(regionLocationFilePathStr); + try (FSDataInputStream inputStream = regionLocationPath.getFileSystem(conf).open(regionLocationPath)) { + RegionLocationTable regionLocationTable = RegionLocationTable.deserialize(inputStream); + InetSocketAddress preferredNodeForRow = regionLocationTable.getPreferredNodeForRow(CellUtil.cloneRow(cell)); + if (preferredNodeForRow != null) { + return new InetSocketAddress[] { preferredNodeForRow }; + } else { + return null; + } + } + } else { + LOG.warn("No region location file path found in configuration"); + return null; + } + } + private HFileContext getContext(HColumnDescriptor desc) { HFileContext ctxt = new HFileContext(); ctxt.setDataBlockEncoding(desc.getDataBlockEncoding()); http://git-wip-us.apache.org/repos/asf/crunch/blob/d65ba78e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java index 4cbe1c5..0db536b 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileUtils.java @@ -51,6 +51,7 @@ import org.apache.crunch.impl.dist.DistributedPipeline; import org.apache.crunch.lib.sort.TotalOrderPartitioner; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.TimeRange; @@ -399,11 +401,20 @@ public final class HFileUtils { return; } PCollection<C> partitioned = sortAndPartition(cells, table, limitToAffectedRegions); + RegionLocationTable regionLocationTable = RegionLocationTable.create( + table.getName().getNameAsString(), + ((RegionLocator) table).getAllRegionLocations()); + Path regionLocationFilePath = new Path(((DistributedPipeline) cells.getPipeline()).createTempPath(), + "regionLocations" + table.getName().getNameAsString()); + writeRegionLocationTable(cells.getPipeline().getConfiguration(), regionLocationFilePath, regionLocationTable); + for (HColumnDescriptor f : families) { byte[] family = f.getName(); + HFileTarget hfileTarget = new HFileTarget(new Path(outputPath, Bytes.toString(family)), f); + hfileTarget.outputConf(RegionLocationTable.REGION_LOCATION_TABLE_PATH, regionLocationFilePath.toString()); partitioned .filter(new FilterByFamilyFn<C>(family)) - .write(new HFileTarget(new Path(outputPath, Bytes.toString(family)), f)); + .write(hfileTarget); } } @@ -580,6 +591,14 @@ public final class HFileUtils { writer.close(); } + private static void writeRegionLocationTable(Configuration conf, Path outputPath, + RegionLocationTable regionLocationTable) throws IOException { + LOG.info("Writing region location table for {} to {}", regionLocationTable.getTableName(), outputPath); + try (FSDataOutputStream fsDataOutputStream = outputPath.getFileSystem(conf).create(outputPath)) { + regionLocationTable.serialize(fsDataOutputStream); + } + } + private static Result doCombineIntoRow(List<KeyValue> kvs, int versions) { // shortcut for the common case if (kvs.isEmpty()) { http://git-wip-us.apache.org/repos/asf/crunch/blob/d65ba78e/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/RegionLocationTable.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/RegionLocationTable.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/RegionLocationTable.java new file mode 100644 index 0000000..fa012af --- /dev/null +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/RegionLocationTable.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.io.hbase; + +import javax.annotation.Nullable; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Provides lookup functionality for the region server location for row keys in an HBase table. + * <p> + * This is a helper class to optimize the locality of HFiles created with {@link HFileOutputFormatForCrunch}, by + * specifying the name of the region server which is hosting the region of a given row as the preferred HDFS data node + * for hosting the written HFile. This is intended to ensure that bulk-created HFiles will be available on the local + * filesystem on the region servers using the created HFile, thus allowing short-circuit reads to the local file system + * on the bulk-created HFiles. + */ +class RegionLocationTable { + + /** + * Per-output configuration key which contains the path to a serialized region location table. + */ + public static final String REGION_LOCATION_TABLE_PATH = "crunch.hfileregionlocation.path"; + + private final String tableName; + private final NavigableMap<byte[], String> regionStartToServerHostName; + + public static RegionLocationTable create(String tableName, List<HRegionLocation> regionLocationList) { + NavigableMap<byte[], String> regionStartToServerHostName = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + for (HRegionLocation regionLocation : regionLocationList) { + byte[] startKey = regionLocation.getRegionInfo().getStartKey(); + if (startKey == null) { + startKey = HConstants.EMPTY_START_ROW; + } + regionStartToServerHostName.put( + startKey, + regionLocation.getServerName().getHostname()); + } + return new RegionLocationTable(tableName, regionStartToServerHostName); + } + + private RegionLocationTable(String tableName, + NavigableMap<byte[], String> regionStartToServerHostName) { + this.tableName = tableName; + this.regionStartToServerHostName = regionStartToServerHostName; + } + + /** + * Returns the name of the HBase table to which this region location table applies. + * + * @return name of the related HBase table + */ + public String getTableName() { + return tableName; + } + + /** + * Returns the optional preferred node for a row. + * <p> + * The return value of this method is an {@link InetSocketAddress} to be in line with the HFile API (and + * underlying HDFS API) which use InetSocketAddress. The port number is always 0 on the returned InetSocketAddress, + * as it is not known from outside the scope of a region server. The HDFS API is implemented to deal "correctly" + * with this, mapping host name to a random data node on the same machine, which is sufficient for the purposes + * here. + * <p> + * The return value will be null if no preferred node is known for the given row. + * + * @param rowKey row key of the row for which the preferred node is to be calculated + * @return socket address of the preferred storage node for the given row, or null + */ + @Nullable + public InetSocketAddress getPreferredNodeForRow(byte[] rowKey) { + Map.Entry<byte[], String> matchingEntry = regionStartToServerHostName.floorEntry(rowKey); + if (matchingEntry != null) { + return InetSocketAddress.createUnresolved(matchingEntry.getValue(), 0); + } else { + return null; + } + } + + /** + * Serialize this table to a {@link DataOutput}. The serialized value can be deserialized via the + * {@link #deserialize(DataInput)} method. + * + * @param dataOutput output to which the table is to be serialized + */ + public void serialize(DataOutput dataOutput) throws IOException { + dataOutput.writeUTF(tableName); + dataOutput.writeInt(regionStartToServerHostName.size()); + for (Map.Entry<byte[], String> regionToHostEntry : regionStartToServerHostName.entrySet()) { + byte[] rowKey = regionToHostEntry.getKey(); + dataOutput.writeInt(rowKey.length); + dataOutput.write(rowKey); + dataOutput.writeUTF(regionToHostEntry.getValue()); + } + } + + /** + * Deserialize a table which was serialized to with the {@link #serialize(DataOutput)} method. + * + * @param dataInput input containing a serialized instance of this class + * @return the deserialized table + */ + public static RegionLocationTable deserialize(DataInput dataInput) throws IOException { + NavigableMap<byte[], String> regionStartToServerHostName = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + String tableName = dataInput.readUTF(); + int numEntries = dataInput.readInt(); + for (int i = 0; i < numEntries; i++) { + int rowKeyLength = dataInput.readInt(); + byte[] rowKey = new byte[rowKeyLength]; + dataInput.readFully(rowKey, 0, rowKeyLength); + String hostName = dataInput.readUTF(); + regionStartToServerHostName.put(rowKey, hostName); + } + return new RegionLocationTable(tableName, regionStartToServerHostName); + } +}
