This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch 2.6.x-hadoop3.1
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/2.6.x-hadoop3.1 by this push:
new e741e2a Backport HBASE-22887 to Kylin HFileOutputFormat3
e741e2a is described below
commit e741e2a196277c74065cac9f1e2c3d32f41496dc
Author: langdamao <[email protected]>
AuthorDate: Fri Dec 13 14:13:02 2019 +0800
Backport HBASE-22887 to Kylin HFileOutputFormat3
Signed-off-by: langdamao <[email protected]>
---
.../kylin/storage/hbase/steps/CubeHFileJob.java | 3 +-
.../storage/hbase/steps/HFileOutputFormat3.java | 794 ++++++++++++++-------
2 files changed, 536 insertions(+), 261 deletions(-)
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
index c6ec255..6490ff7 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
@@ -109,7 +108,7 @@ public class CubeHFileJob extends AbstractHadoopJob {
Table table = connection.getTable(TableName.valueOf(hTableName));
RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(hTableName));
// Automatic config !
- HFileOutputFormat2.configureIncrementalLoad(job, table,
regionLocator);
+ HFileOutputFormat3.configureIncrementalLoad(job, table,
regionLocator);
reconfigurePartitions(hbaseConf, partitionFilePath);
job.setInputFormatClass(SequenceFileInputFormat.class);
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
index 12c30ea..1ec5887 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java
@@ -14,60 +14,76 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
+
package org.apache.kylin.storage.hbase.steps;
+import static
org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TASK_KEY;
+import static
org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
+import static
org.apache.hadoop.hbase.regionserver.HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY;
+import static
org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
+
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.function.Function;
+import java.util.stream.Collectors;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TextSortReducer;
+import org.apache.hadoop.hbase.mapreduce.CellSerialization;
+import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
+import org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -80,11 +96,14 @@ import
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.kylin.common.util.RandomUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
+import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
- * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2,
with fix attempt on KYLIN-2788
+ * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2,
with fix attempt on KYLIN-4293|HBASE-22887
*
* Writes HFiles. Passed Cells must arrive in order.
* Writes current time as the sequence id for the file. Sets the major
compacted
@@ -92,49 +111,117 @@ import com.google.common.annotations.VisibleForTesting;
* all HFiles being written.
* <p>
* Using this class as part of a MapReduce job is best done
- * using {@link #configureIncrementalLoad(Job, Table, RegionLocator)}.
+ * using {@link #configureIncrementalLoad(Job, TableDescriptor,
RegionLocator)}.
*/
@InterfaceAudience.Public
[email protected]
-public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable, Cell> {
- static Log LOG = LogFactory.getLog(HFileOutputFormat3.class);
+public class HFileOutputFormat3
+ extends FileOutputFormat<ImmutableBytesWritable, Cell> {
+ private static final Logger LOG =
LoggerFactory.getLogger(HFileOutputFormat3.class);
+ static class TableInfo {
+ private TableDescriptor tableDesctiptor;
+ private RegionLocator regionLocator;
+
+ public TableInfo(TableDescriptor tableDesctiptor, RegionLocator
regionLocator) {
+ this.tableDesctiptor = tableDesctiptor;
+ this.regionLocator = regionLocator;
+ }
+
+ /**
+ * The modification for the returned HTD doesn't affect the inner TD.
+ * @return A clone of inner table descriptor
+ * @deprecated use {@link #getTableDescriptor}
+ */
+ @Deprecated
+ public HTableDescriptor getHTableDescriptor() {
+ return new HTableDescriptor(tableDesctiptor);
+ }
+
+ public TableDescriptor getTableDescriptor() {
+ return tableDesctiptor;
+ }
+
+ public RegionLocator getRegionLocator() {
+ return regionLocator;
+ }
+ }
+
+ protected static final byte[] tableSeparator = Bytes.toBytes(";");
+
+ protected static byte[] combineTableNameSuffix(byte[] tableName, byte[]
suffix) {
+ return Bytes.add(tableName, tableSeparator, suffix);
+ }
// The following constants are private since these are used by
// HFileOutputFormat2 to internally transfer data between job setup and
// reducer run using conf.
// These should not be changed by the client.
- private static final String COMPRESSION_FAMILIES_CONF_KEY =
"hbase.hfileoutputformat.families.compression";
- private static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
"hbase.hfileoutputformat.families.bloomtype";
- private static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.blocksize";
- private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
+ static final String COMPRESSION_FAMILIES_CONF_KEY =
+ "hbase.hfileoutputformat.families.compression";
+ static final String BLOOM_TYPE_FAMILIES_CONF_KEY =
+ "hbase.hfileoutputformat.families.bloomtype";
+ static final String BLOCK_SIZE_FAMILIES_CONF_KEY =
+ "hbase.mapreduce.hfileoutputformat.blocksize";
+ static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY =
+ "hbase.mapreduce.hfileoutputformat.families.datablock.encoding";
// This constant is public since the client can modify this when setting
// up their conf object and thus refer to this symbol.
// It is present for backwards compatibility reasons. Use it only to
// override the auto-detection of datablock encoding.
- public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
"hbase.mapreduce.hfileoutputformat.datablock.encoding";
+ public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY =
+ "hbase.mapreduce.hfileoutputformat.datablock.encoding";
+
+ /**
+ * Keep locality while generating HFiles for bulkload. See HBASE-12596
+ */
+ public static final String LOCALITY_SENSITIVE_CONF_KEY =
+ "hbase.bulkload.locality.sensitive.enabled";
+ private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
+ static final String OUTPUT_TABLE_NAME_CONF_KEY =
+ "hbase.mapreduce.hfileoutputformat.table.name";
+ static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
+ "hbase.mapreduce.use.multi.table.hfileoutputformat";
+
+ public static final String STORAGE_POLICY_PROPERTY =
HStore.BLOCK_STORAGE_POLICY_KEY;
+ public static final String STORAGE_POLICY_PROPERTY_CF_PREFIX =
STORAGE_POLICY_PROPERTY + ".";
@Override
- public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(final
TaskAttemptContext context)
- throws IOException, InterruptedException {
+ public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(
+ final TaskAttemptContext context) throws IOException,
InterruptedException {
return createRecordWriter(context, this.getOutputCommitter(context));
}
- static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
createRecordWriter(final TaskAttemptContext context,
- final OutputCommitter committer) throws IOException,
InterruptedException {
+ protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName,
byte[] family) {
+ return combineTableNameSuffix(tableName, family);
+ }
+
+ static <V extends Cell> RecordWriter<ImmutableBytesWritable, V>
+ createRecordWriter(final TaskAttemptContext context, final OutputCommitter
committer)
+ throws IOException {
// Get the path of the temporary output file
- final Path outputdir = ((FileOutputCommitter) committer).getWorkPath();
+ final Path outputDir = ((FileOutputCommitter)committer).getWorkPath();
final Configuration conf = context.getConfiguration();
- LOG.debug("Task output path: " + outputdir);
- final FileSystem fs = outputdir.getFileSystem(conf);
+ final boolean writeMultipleTables =
conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false) ;
+ final String writeTableNames = conf.get(OUTPUT_TABLE_NAME_CONF_KEY);
+ if (writeTableNames==null || writeTableNames.isEmpty()) {
+ throw new IllegalArgumentException("Configuration parameter " +
OUTPUT_TABLE_NAME_CONF_KEY
+ + " cannot be empty");
+ }
+ final FileSystem fs = outputDir.getFileSystem(conf);
// These configs. are from hbase-*.xml
- final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE);
+ final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+ HConstants.DEFAULT_MAX_FILE_SIZE);
// Invented config. Add to hbase-*.xml if other than default
compression.
- final String defaultCompressionStr = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName());
- final Algorithm defaultCompression =
AbstractHFileWriter.compressionByName(defaultCompressionStr);
- final boolean compactionExclude =
conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
- false);
+ final String defaultCompressionStr = conf.get("hfile.compression",
+ Compression.Algorithm.NONE.getName());
+ final Algorithm defaultCompression = HFileWriterImpl
+ .compressionByName(defaultCompressionStr);
+ final boolean compactionExclude = conf.getBoolean(
+ "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
+
+ final Set<String> allTableNames = Arrays.stream(writeTableNames.split(
+ Bytes.toString(tableSeparator))).collect(Collectors.toSet());
// create a map from column family to the compression algorithm
final Map<byte[], Algorithm> compressionMap =
createFamilyCompressionMap(conf);
@@ -142,7 +229,8 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
final Map<byte[], Integer> blockSizeMap =
createFamilyBlockSizeMap(conf);
String dataBlockEncodingStr =
conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY);
- final Map<byte[], DataBlockEncoding> datablockEncodingMap =
createFamilyDataBlockEncodingMap(conf);
+ final Map<byte[], DataBlockEncoding> datablockEncodingMap
+ = createFamilyDataBlockEncodingMap(conf);
final DataBlockEncoding overriddenEncoding;
if (dataBlockEncodingStr != null) {
overriddenEncoding =
DataBlockEncoding.valueOf(dataBlockEncodingStr);
@@ -152,118 +240,265 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
return new RecordWriter<ImmutableBytesWritable, V>() {
// Map of families to writers and how much has been output on the
writer.
- private final Map<byte[], WriterLength> writers = new
TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR);
- private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY;
- private final byte[] now =
Bytes.toBytes(System.currentTimeMillis());
- private boolean rollRequested = false;
+ private final Map<byte[], WriterLength> writers =
+ new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ private final Map<byte[], byte[]> previousRows =
+ new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ private final long now = EnvironmentEdgeManager.currentTime();
@Override
- public void write(ImmutableBytesWritable row, V cell) throws
IOException {
- KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ public void write(ImmutableBytesWritable row, V cell)
+ throws IOException {
+ Cell kv = cell;
+ // null input == user explicitly wants to flush
if (row == null && kv == null) {
- rollWriters();
+ rollWriters(null);
return;
}
+
byte[] rowKey = CellUtil.cloneRow(kv);
- long length = kv.getLength();
+ int length = (PrivateCellUtil.estimatedSerializedSizeOf(kv)) -
Bytes.SIZEOF_INT;
byte[] family = CellUtil.cloneFamily(kv);
- WriterLength wl = this.writers.get(family);
- if (wl == null) {
- fs.mkdirs(new Path(outputdir, Bytes.toString(family)));
+ byte[] tableNameBytes = null;
+ if (writeMultipleTables) {
+ tableNameBytes =
HFileOutputFormat3.getTableName(row.get());
+ if
(!allTableNames.contains(Bytes.toString(tableNameBytes))) {
+ throw new IllegalArgumentException("TableName '" +
Bytes.toString(tableNameBytes) +
+ "' not" + " expected");
+ }
+ } else {
+ tableNameBytes = Bytes.toBytes(writeTableNames);
}
- if (wl != null && wl.written + length >= maxsize) {
- this.rollRequested = true;
+ byte[] tableAndFamily =
getTableNameSuffixedWithFamily(tableNameBytes, family);
+ WriterLength wl = this.writers.get(tableAndFamily);
+
+ // If this is a new column family, verify that the directory
exists
+ if (wl == null) {
+ Path writerPath = null;
+ if (writeMultipleTables) {
+ writerPath = new Path(outputDir, new
Path(Bytes.toString(tableNameBytes), Bytes
+ .toString(family)));
+ } else {
+ writerPath = new Path(outputDir,
Bytes.toString(family));
+ }
+ fs.mkdirs(writerPath);
+ configureStoragePolicy(conf, fs, tableAndFamily,
writerPath);
}
- if (rollRequested && Bytes.compareTo(this.previousRow, rowKey)
!= 0) {
- rollWriters();
+
+ // This can only happen once a row is finished though
+ if (wl != null && wl.written + length >= maxsize
+ && Bytes.compareTo(this.previousRows.get(family),
rowKey) != 0) {
+ rollWriters(wl);
}
+
+ // create a new WAL writer, if necessary
if (wl == null || wl.writer == null) {
- wl = getNewWriter(family, conf);
+ if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY,
DEFAULT_LOCALITY_SENSITIVE)) {
+ HRegionLocation loc = null;
+
+ String tableName = Bytes.toString(tableNameBytes);
+ if (tableName != null) {
+ try (Connection connection =
ConnectionFactory.createConnection(conf);
+ RegionLocator locator =
+
connection.getRegionLocator(TableName.valueOf(tableName))) {
+ loc = locator.getRegionLocation(rowKey);
+ } catch (Throwable e) {
+ LOG.warn("There's something wrong when
locating rowkey: " +
+ Bytes.toString(rowKey) + " for
tablename: " + tableName, e);
+ loc = null;
+ }
+ }
+
+ if (null == loc) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("failed to get region location, so
use default writer for rowkey: " +
+ Bytes.toString(rowKey));
+ }
+ wl = getNewWriter(tableNameBytes, family, conf,
null);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("first rowkey: [" +
Bytes.toString(rowKey) + "]");
+ }
+ InetSocketAddress initialIsa =
+ new InetSocketAddress(loc.getHostname(),
loc.getPort());
+ if (initialIsa.isUnresolved()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("failed to resolve bind address:
" + loc.getHostname() + ":"
+ + loc.getPort() + ", so use
default writer");
+ }
+ wl = getNewWriter(tableNameBytes, family,
conf, null);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("use favored nodes writer: " +
initialIsa.getHostString());
+ }
+ wl = getNewWriter(tableNameBytes, family,
conf, new InetSocketAddress[] { initialIsa
+ });
+ }
+ }
+ } else {
+ wl = getNewWriter(tableNameBytes, family, conf, null);
+ }
}
- kv.updateLatestStamp(this.now);
+
+ // we now have the proper WAL writer. full steam ahead
+ PrivateCellUtil.updateLatestStamp(cell, this.now);
wl.writer.append(kv);
wl.written += length;
- this.previousRow = rowKey;
+
+ // Copy the row so we know when a row transition.
+ this.previousRows.put(family, rowKey);
}
- private void rollWriters() throws IOException {
- for (WriterLength wl : this.writers.values()) {
- if (wl.writer != null) {
- LOG.info("Writer=" + wl.writer.getPath() +
((wl.written == 0) ? "" : ", wrote=" + wl.written));
- close(wl.writer);
+ private void rollWriters(WriterLength writerLength) throws
IOException {
+ if (writerLength != null) {
+ closeWriter(writerLength);
+ } else {
+ for (WriterLength wl : this.writers.values()) {
+ closeWriter(wl);
}
- wl.writer = null;
- wl.written = 0;
}
- this.rollRequested = false;
}
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"BX_UNBOXING_IMMEDIATELY_REBOXED", justification = "Not important")
- private WriterLength getNewWriter(byte[] family, Configuration
conf) throws IOException {
+ private void closeWriter(WriterLength wl) throws IOException {
+ if (wl.writer != null) {
+ LOG.info(
+ "Writer=" + wl.writer.getPath() + ((wl.written ==
0)? "": ", wrote=" + wl.written));
+ close(wl.writer);
+ }
+ wl.writer = null;
+ wl.written = 0;
+ }
+
+ /*
+ * Create a new StoreFile.Writer.
+ * @param family
+ * @return A WriterLength, containing a new StoreFile.Writer.
+ * @throws IOException
+ */
+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BX_UNBOXING_IMMEDIATELY_REBOXED",
+ justification="Not important")
+ private WriterLength getNewWriter(byte[] tableName, byte[] family,
Configuration
+ conf, InetSocketAddress[] favoredNodes) throws IOException
{
+ byte[] tableAndFamily =
getTableNameSuffixedWithFamily(tableName, family);
+ Path familydir = new Path(outputDir, Bytes.toString(family));
+ if (writeMultipleTables) {
+ familydir = new Path(outputDir,
+ new Path(Bytes.toString(tableName),
Bytes.toString(family)));
+ }
WriterLength wl = new WriterLength();
- Path familydir = new Path(outputdir, Bytes.toString(family));
- Algorithm compression = compressionMap.get(family);
+ Algorithm compression = compressionMap.get(tableAndFamily);
compression = compression == null ? defaultCompression :
compression;
- BloomType bloomType = bloomTypeMap.get(family);
+ BloomType bloomType = bloomTypeMap.get(tableAndFamily);
bloomType = bloomType == null ? BloomType.NONE : bloomType;
- Integer blockSize = blockSizeMap.get(family);
+ Integer blockSize = blockSizeMap.get(tableAndFamily);
blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE :
blockSize;
DataBlockEncoding encoding = overriddenEncoding;
- encoding = encoding == null ? datablockEncodingMap.get(family)
: encoding;
+ encoding = encoding == null ?
datablockEncodingMap.get(tableAndFamily) : encoding;
encoding = encoding == null ? DataBlockEncoding.NONE :
encoding;
Configuration tempConf = new Configuration(conf);
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
- HFileContextBuilder contextBuilder = new
HFileContextBuilder().withCompression(compression)
+ HFileContextBuilder contextBuilder = new HFileContextBuilder()
+ .withCompression(compression)
.withChecksumType(HStore.getChecksumType(conf))
-
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize);
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+ .withBlockSize(blockSize);
+
+ if (HFile.getFormatVersion(conf) >=
HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
+ contextBuilder.withIncludesTags(true);
+ }
+
contextBuilder.withDataBlockEncoding(encoding);
HFileContext hFileContext = contextBuilder.build();
+ if (null == favoredNodes) {
+ wl.writer =
+ new StoreFileWriter.Builder(conf, new
CacheConfig(tempConf), fs)
+
.withOutputDir(familydir).withBloomType(bloomType)
+
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext).build();
+ } else {
+ wl.writer =
+ new StoreFileWriter.Builder(conf, new
CacheConfig(tempConf), new HFileSystem(fs))
+
.withOutputDir(familydir).withBloomType(bloomType)
+
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
+ .withFavoredNodes(favoredNodes).build();
+ }
- wl.writer = new StoreFile.WriterBuilder(conf, new
CacheConfig(tempConf), fs).withOutputDir(familydir)
-
.withBloomType(bloomType).withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
- .build();
-
- this.writers.put(family, wl);
+ this.writers.put(tableAndFamily, wl);
return wl;
}
- private void close(final StoreFile.Writer w) throws IOException {
+ private void close(final StoreFileWriter w) throws IOException {
if (w != null) {
- w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()));
- w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
Bytes.toBytes(context.getTaskAttemptID().toString()));
- w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
Bytes.toBytes(true));
-
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude));
+ w.appendFileInfo(BULKLOAD_TIME_KEY,
+ Bytes.toBytes(System.currentTimeMillis()));
+ w.appendFileInfo(BULKLOAD_TASK_KEY,
+
Bytes.toBytes(context.getTaskAttemptID().toString()));
+ w.appendFileInfo(MAJOR_COMPACTION_KEY,
+ Bytes.toBytes(true));
+ w.appendFileInfo(EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+ Bytes.toBytes(compactionExclude));
w.appendTrackedTimestampsToMetadata();
w.close();
}
}
@Override
- public void close(TaskAttemptContext c) throws IOException,
InterruptedException {
- for (WriterLength wl : this.writers.values()) {
+ public void close(TaskAttemptContext c)
+ throws IOException, InterruptedException {
+ for (WriterLength wl: this.writers.values()) {
close(wl.writer);
}
}
};
}
+ /**
+ * Configure block storage policy for CF after the directory is created.
+ */
+ static void configureStoragePolicy(final Configuration conf, final
FileSystem fs,
+ byte[] tableAndFamily, Path cfPath) {
+ if (null == conf || null == fs || null == tableAndFamily || null ==
cfPath) {
+ return;
+ }
+
+ String policy =
+ conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX +
Bytes.toString(tableAndFamily),
+ conf.get(STORAGE_POLICY_PROPERTY));
+ FSUtils.setStoragePolicy(fs, cfPath, policy);
+ }
+
/*
* Data structure to hold a Writer and amount of data written on it.
*/
static class WriterLength {
long written = 0;
- StoreFile.Writer writer = null;
+ StoreFileWriter writer = null;
}
/**
* Return the start keys of all of the regions in this table,
* as a list of ImmutableBytesWritable.
*/
- private static List<ImmutableBytesWritable>
getRegionStartKeys(RegionLocator table) throws IOException {
- byte[][] byteKeys = table.getStartKeys();
- ArrayList<ImmutableBytesWritable> ret = new
ArrayList<ImmutableBytesWritable>(byteKeys.length);
- for (byte[] byteKey : byteKeys) {
- ret.add(new ImmutableBytesWritable(byteKey));
+ private static List<ImmutableBytesWritable>
getRegionStartKeys(List<RegionLocator> regionLocators,
+ boolean
writeMultipleTables)
+ throws IOException {
+
+ ArrayList<ImmutableBytesWritable> ret = new ArrayList<>();
+ for(RegionLocator regionLocator : regionLocators) {
+ TableName tableName = regionLocator.getName();
+ LOG.info("Looking up current regions for table " + tableName);
+ byte[][] byteKeys = regionLocator.getStartKeys();
+ for (byte[] byteKey : byteKeys) {
+ byte[] fullKey = byteKey; //HFileOutputFormat2 use case
+ if (writeMultipleTables) {
+ //MultiTableHFileOutputFormat use case
+ fullKey = combineTableNameSuffix(tableName.getName(),
byteKey);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SplitPoint startkey for table [" + tableName +
"]: ["
+ + Bytes.toStringBinary(fullKey) + "]");
+ }
+ ret.add(new ImmutableBytesWritable(fullKey));
+ }
}
return ret;
}
@@ -273,8 +508,8 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
* {@link TotalOrderPartitioner} that contains the split points in
startKeys.
*/
@SuppressWarnings("deprecation")
- private static void writePartitions(Configuration conf, Path
partitionsPath, List<ImmutableBytesWritable> startKeys)
- throws IOException {
+ private static void writePartitions(Configuration conf, Path
partitionsPath,
+ List<ImmutableBytesWritable>
startKeys, boolean writeMultipleTables) throws IOException {
LOG.info("Writing partition information to " + partitionsPath);
if (startKeys.isEmpty()) {
throw new IllegalArgumentException("No regions passed");
@@ -284,18 +519,22 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
// have keys < the first region (which has an empty start key)
// so we need to remove it. Otherwise we would end up with an
// empty reducer with index 0
- TreeSet<ImmutableBytesWritable> sorted = new
TreeSet<ImmutableBytesWritable>(startKeys);
-
+ TreeSet<ImmutableBytesWritable> sorted = new TreeSet<>(startKeys);
ImmutableBytesWritable first = sorted.first();
- if (!Arrays.equals(first.get(), HConstants.EMPTY_BYTE_ARRAY)) {
- throw new IllegalArgumentException("First region of table should
have empty start key. Instead has: "
- + Bytes.toStringBinary(first.get()));
+ if (writeMultipleTables) {
+ first = new
ImmutableBytesWritable(HFileOutputFormat3.getSuffix(sorted.first().get()));
+ }
+ if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
+ throw new IllegalArgumentException(
+ "First region of table should have empty start key.
Instead has: "
+ + Bytes.toStringBinary(first.get()));
}
- sorted.remove(first);
+ sorted.remove(sorted.first());
// Write the actual file
FileSystem fs = partitionsPath.getFileSystem(conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
partitionsPath, ImmutableBytesWritable.class,
+ SequenceFile.Writer writer = SequenceFile.createWriter(
+ fs, conf, partitionsPath, ImmutableBytesWritable.class,
NullWritable.class);
try {
@@ -320,30 +559,10 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
* </ul>
* The user should be sure to set the map output value class to either
KeyValue or Put before
* running this function.
- *
- * @deprecated Use {@link #configureIncrementalLoad(Job, Table,
RegionLocator)} instead.
*/
- @Deprecated
- public static void configureIncrementalLoad(Job job, HTable table) throws
IOException {
- configureIncrementalLoad(job, table.getTableDescriptor(),
table.getRegionLocator());
- }
-
- /**
- * Configure a MapReduce Job to perform an incremental load into the given
- * table. This
- * <ul>
- * <li>Inspects the table to configure a total order partitioner</li>
- * <li>Uploads the partitions file to the cluster and adds it to the
DistributedCache</li>
- * <li>Sets the number of reduce tasks to match the current number of
regions</li>
- * <li>Sets the output key/value class to match HFileOutputFormat2's
requirements</li>
- * <li>Sets the reducer up to perform the appropriate sorting (either
KeyValueSortReducer or
- * PutSortReducer)</li>
- * </ul>
- * The user should be sure to set the map output value class to either
KeyValue or Put before
- * running this function.
- */
- public static void configureIncrementalLoad(Job job, Table table,
RegionLocator regionLocator) throws IOException {
- configureIncrementalLoad(job, table.getTableDescriptor(),
regionLocator);
+ public static void configureIncrementalLoad(Job job, Table table,
RegionLocator regionLocator)
+ throws IOException {
+ configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
}
/**
@@ -360,23 +579,34 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
* The user should be sure to set the map output value class to either
KeyValue or Put before
* running this function.
*/
- public static void configureIncrementalLoad(Job job, HTableDescriptor
tableDescriptor, RegionLocator regionLocator)
- throws IOException {
- configureIncrementalLoad(job, tableDescriptor, regionLocator,
HFileOutputFormat3.class);
+ public static void configureIncrementalLoad(Job job, TableDescriptor
tableDescriptor,
+ RegionLocator regionLocator)
throws IOException {
+ ArrayList<TableInfo> singleTableInfo = new ArrayList<>();
+ singleTableInfo.add(new TableInfo(tableDescriptor, regionLocator));
+ configureIncrementalLoad(job, singleTableInfo,
HFileOutputFormat3.class);
}
- static void configureIncrementalLoad(Job job, HTableDescriptor
tableDescriptor, RegionLocator regionLocator,
- Class<? extends OutputFormat<?, ?>> cls) throws IOException,
UnsupportedEncodingException {
+ static void configureIncrementalLoad(Job job, List<TableInfo>
multiTableInfo,
+ Class<? extends OutputFormat<?, ?>>
cls) throws IOException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(KeyValue.class);
+ job.setOutputValueClass(MapReduceExtendedCell.class);
job.setOutputFormatClass(cls);
+ if (multiTableInfo.stream().distinct().count() !=
multiTableInfo.size()) {
+ throw new IllegalArgumentException("Duplicate entries found in
TableInfo argument");
+ }
+ boolean writeMultipleTables = false;
+ if (MultiTableHFileOutputFormat.class.equals(cls)) {
+ writeMultipleTables = true;
+ conf.setBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, true);
+ }
// Based on the configured map output class, set the correct reducer
to properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
- if (KeyValue.class.equals(job.getMapOutputValueClass())) {
- job.setReducerClass(KeyValueSortReducer.class);
+ if (KeyValue.class.equals(job.getMapOutputValueClass())
+ ||
MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())) {
+ job.setReducerClass(CellSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
} else if (Text.class.equals(job.getMapOutputValueClass())) {
@@ -385,44 +615,75 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
LOG.warn("Unknown map output value type:" +
job.getMapOutputValueClass());
}
- conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(),
- ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
+ conf.setStrings("io.serializations", conf.get("io.serializations"),
+ MutationSerialization.class.getName(),
ResultSerialization.class.getName(),
+ CellSerialization.class.getName());
+ if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY,
DEFAULT_LOCALITY_SENSITIVE)) {
+ LOG.info("bulkload locality sensitive enabled");
+ }
+
+ /* Now get the region start keys for every table required */
+ List<String> allTableNames = new ArrayList<>(multiTableInfo.size());
+ List<RegionLocator> regionLocators = new
ArrayList<>(multiTableInfo.size());
+ List<TableDescriptor> tableDescriptors = new
ArrayList<>(multiTableInfo.size());
+
+ for (TableInfo tableInfo : multiTableInfo) {
+ regionLocators.add(tableInfo.getRegionLocator());
+
allTableNames.add(tableInfo.getRegionLocator().getName().getNameAsString());
+ tableDescriptors.add(tableInfo.getTableDescriptor());
+ }
+ // Record tablenames for creating writer by favored nodes, and
decoding compression, block size and other attributes of columnfamily per table
+ conf.set(OUTPUT_TABLE_NAME_CONF_KEY, StringUtils.join(allTableNames,
Bytes
+ .toString(tableSeparator)));
+ List<ImmutableBytesWritable> startKeys =
getRegionStartKeys(regionLocators, writeMultipleTables);
// Use table's region boundaries for TOP split points.
- LOG.info("Looking up current regions for table " +
tableDescriptor.getTableName());
- List<ImmutableBytesWritable> startKeys =
getRegionStartKeys(regionLocator);
- LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count");
+ LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
+ "to match current region count for all tables");
job.setNumReduceTasks(startKeys.size());
- configurePartitioner(job, startKeys);
+ configurePartitioner(job, startKeys, writeMultipleTables);
// Set compression algorithms based on column families
- configureCompression(conf, tableDescriptor);
- configureBloomType(tableDescriptor, conf);
- configureBlockSize(tableDescriptor, conf);
- configureDataBlockEncoding(tableDescriptor, conf);
+
+ conf.set(COMPRESSION_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(compressionDetails,
+ tableDescriptors));
+ conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(blockSizeDetails,
+ tableDescriptors));
+ conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
serializeColumnFamilyAttribute(bloomTypeDetails,
+ tableDescriptors));
+ conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+ serializeColumnFamilyAttribute(dataBlockEncodingDetails,
tableDescriptors));
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
- LOG.info("Incremental table " + regionLocator.getName() + " output
configured.");
+ LOG.info("Incremental output configured for tables: " +
StringUtils.join(allTableNames, ","));
}
- public static void configureIncrementalLoadMap(Job job, Table table)
throws IOException {
+ public static void configureIncrementalLoadMap(Job job, TableDescriptor
tableDescriptor) throws
+ IOException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
- job.setOutputValueClass(KeyValue.class);
+ job.setOutputValueClass(MapReduceExtendedCell.class);
job.setOutputFormatClass(HFileOutputFormat3.class);
+ ArrayList<TableDescriptor> singleTableDescriptor = new ArrayList<>(1);
+ singleTableDescriptor.add(tableDescriptor);
+
+ conf.set(OUTPUT_TABLE_NAME_CONF_KEY,
tableDescriptor.getTableName().getNameAsString());
// Set compression algorithms based on column families
- configureCompression(conf, table.getTableDescriptor());
- configureBloomType(table.getTableDescriptor(), conf);
- configureBlockSize(table.getTableDescriptor(), conf);
- HTableDescriptor tableDescriptor = table.getTableDescriptor();
- configureDataBlockEncoding(tableDescriptor, conf);
+ conf.set(COMPRESSION_FAMILIES_CONF_KEY,
+ serializeColumnFamilyAttribute(compressionDetails,
singleTableDescriptor));
+ conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
+ serializeColumnFamilyAttribute(blockSizeDetails,
singleTableDescriptor));
+ conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
+ serializeColumnFamilyAttribute(bloomTypeDetails,
singleTableDescriptor));
+ conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
+ serializeColumnFamilyAttribute(dataBlockEncodingDetails,
singleTableDescriptor));
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
- LOG.info("Incremental table " + table.getName() + " output
configured.");
+ LOG.info("Incremental table " + tableDescriptor.getTableName() + "
output configured.");
}
/**
@@ -433,11 +694,13 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
* @return a map from column family to the configured compression algorithm
*/
@VisibleForTesting
- static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
COMPRESSION_FAMILIES_CONF_KEY);
- Map<byte[], Algorithm> compressionMap = new TreeMap<byte[],
Algorithm>(Bytes.BYTES_COMPARATOR);
+ static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration
+ conf) {
+ Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+ COMPRESSION_FAMILIES_CONF_KEY);
+ Map<byte[], Algorithm> compressionMap = new
TreeMap<>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
- Algorithm algorithm =
AbstractHFileWriter.compressionByName(e.getValue());
+ Algorithm algorithm =
HFileWriterImpl.compressionByName(e.getValue());
compressionMap.put(e.getKey(), algorithm);
}
return compressionMap;
@@ -452,8 +715,9 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
*/
@VisibleForTesting
static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf)
{
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
BLOOM_TYPE_FAMILIES_CONF_KEY);
- Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[],
BloomType>(Bytes.BYTES_COMPARATOR);
+ Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+ BLOOM_TYPE_FAMILIES_CONF_KEY);
+ Map<byte[], BloomType> bloomTypeMap = new
TreeMap<>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
BloomType bloomType = BloomType.valueOf(e.getValue());
bloomTypeMap.put(e.getKey(), bloomType);
@@ -470,8 +734,9 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
*/
@VisibleForTesting
static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
BLOCK_SIZE_FAMILIES_CONF_KEY);
- Map<byte[], Integer> blockSizeMap = new TreeMap<byte[],
Integer>(Bytes.BYTES_COMPARATOR);
+ Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+ BLOCK_SIZE_FAMILIES_CONF_KEY);
+ Map<byte[], Integer> blockSizeMap = new
TreeMap<>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
Integer blockSize = Integer.parseInt(e.getValue());
blockSizeMap.put(e.getKey(), blockSize);
@@ -488,15 +753,18 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
* configured data block type for the family
*/
@VisibleForTesting
- static Map<byte[], DataBlockEncoding>
createFamilyDataBlockEncodingMap(Configuration conf) {
- Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
- Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[],
DataBlockEncoding>(Bytes.BYTES_COMPARATOR);
+ static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(
+ Configuration conf) {
+ Map<byte[], String> stringMap = createFamilyConfValueMap(conf,
+ DATABLOCK_ENCODING_FAMILIES_CONF_KEY);
+ Map<byte[], DataBlockEncoding> encoderMap = new
TreeMap<>(Bytes.BYTES_COMPARATOR);
for (Map.Entry<byte[], String> e : stringMap.entrySet()) {
encoderMap.put(e.getKey(),
DataBlockEncoding.valueOf((e.getValue())));
}
return encoderMap;
}
+
/**
* Run inside the task to deserialize column family to given conf value
map.
*
@@ -504,8 +772,9 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
* @param confName conf key to read from the configuration
* @return a map of column family to the given configuration value
*/
- private static Map<byte[], String> createFamilyConfValueMap(Configuration
conf, String confName) {
- Map<byte[], String> confValMap = new TreeMap<byte[],
String>(Bytes.BYTES_COMPARATOR);
+ private static Map<byte[], String> createFamilyConfValueMap(
+ Configuration conf, String confName) {
+ Map<byte[], String> confValMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
String confVal = conf.get(confName, "");
for (String familyConf : confVal.split("&")) {
String[] familySplit = familyConf.split("=");
@@ -513,7 +782,7 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
continue;
}
try {
- confValMap.put(URLDecoder.decode(familySplit[0],
"UTF-8").getBytes(StandardCharsets.UTF_8),
+ confValMap.put(Bytes.toBytes(URLDecoder.decode(familySplit[0],
"UTF-8")),
URLDecoder.decode(familySplit[1], "UTF-8"));
} catch (UnsupportedEncodingException e) {
// will not happen with UTF-8 encoding
@@ -527,13 +796,18 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
* Configure <code>job</code> with a TotalOrderPartitioner, partitioning
against
* <code>splitPoints</code>. Cleans up the partitions file after job
exists.
*/
- static void configurePartitioner(Job job, List<ImmutableBytesWritable>
splitPoints) throws IOException {
+ static void configurePartitioner(Job job, List<ImmutableBytesWritable>
splitPoints, boolean
+ writeMultipleTables)
+ throws IOException {
Configuration conf = job.getConfiguration();
// create the partitions file
FileSystem fs = FileSystem.get(conf);
- Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"),
"partitions_" + RandomUtil.randomUUID());
+ String hbaseTmpFsDir =
+ conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
+ HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
+ Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" +
RandomUtil.randomUUID());
fs.makeQualified(partitionsPath);
- writePartitions(conf, partitionsPath, splitPoints);
+ writePartitions(conf, partitionsPath, splitPoints,
writeMultipleTables);
fs.deleteOnExit(partitionsPath);
// configure job to use it
@@ -541,134 +815,136 @@ public class HFileOutputFormat3 extends
FileOutputFormat<ImmutableBytesWritable,
TotalOrderPartitioner.setPartitionFile(conf, partitionsPath);
}
- /**
- * Serialize column family to compression algorithm map to configuration.
- * Invoked while configuring the MR job for incremental load.
- *
- * @param table to read the properties from
- * @param conf to persist serialized values into
- * @throws IOException
- * on failure to read column family descriptors
- */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE")
@VisibleForTesting
- static void configureCompression(Configuration conf, HTableDescriptor
tableDescriptor)
+ static String
serializeColumnFamilyAttribute(Function<ColumnFamilyDescriptor, String> fn,
List<TableDescriptor> allTables)
throws UnsupportedEncodingException {
- StringBuilder compressionConfigValue = new StringBuilder();
- if (tableDescriptor == null) {
- // could happen with mock table instance
- return;
- }
- Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
+ StringBuilder attributeValue = new StringBuilder();
int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- compressionConfigValue.append('&');
+ for (TableDescriptor tableDescriptor : allTables) {
+ if (tableDescriptor == null) {
+ // could happen with mock table instance
+ // CODEREVIEW: Can I set an empty string in conf if mock table
instance?
+ return "";
+ }
+ for (ColumnFamilyDescriptor familyDescriptor :
tableDescriptor.getColumnFamilies()) {
+ if (i++ > 0) {
+ attributeValue.append('&');
+ }
+ attributeValue.append(URLEncoder.encode(
+
Bytes.toString(combineTableNameSuffix(tableDescriptor.getTableName().getName(),
familyDescriptor.getName())),
+ "UTF-8"));
+ attributeValue.append('=');
+
attributeValue.append(URLEncoder.encode(fn.apply(familyDescriptor), "UTF-8"));
}
-
compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(),
"UTF-8"));
- compressionConfigValue.append('=');
-
compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(),
"UTF-8"));
}
// Get rid of the last ampersand
- conf.set(COMPRESSION_FAMILIES_CONF_KEY,
compressionConfigValue.toString());
+ return attributeValue.toString();
}
/**
- * Serialize column family to block size map to configuration.
+ * Serialize column family to compression algorithm map to configuration.
* Invoked while configuring the MR job for incremental load.
+ *
* @param tableDescriptor to read the properties from
* @param conf to persist serialized values into
+ * @throws IOException
+ * on failure to read column family descriptors
+ */
+ @VisibleForTesting
+ static Function<ColumnFamilyDescriptor, String> compressionDetails =
familyDescriptor ->
+ familyDescriptor.getCompressionType().getName();
+
+ /**
+ * Serialize column family to block size map to configuration. Invoked
while
+ * configuring the MR job for incremental load.
+ *
+ * @param tableDescriptor
+ * to read the properties from
+ * @param conf
+ * to persist serialized values into
*
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
- static void configureBlockSize(HTableDescriptor tableDescriptor,
Configuration conf)
- throws UnsupportedEncodingException {
- StringBuilder blockSizeConfigValue = new StringBuilder();
- if (tableDescriptor == null) {
- // could happen with mock table instance
- return;
- }
- Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- blockSizeConfigValue.append('&');
- }
-
blockSizeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(),
"UTF-8"));
- blockSizeConfigValue.append('=');
-
blockSizeConfigValue.append(URLEncoder.encode(String.valueOf(familyDescriptor.getBlocksize()),
"UTF-8"));
- }
- // Get rid of the last ampersand
- conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY,
blockSizeConfigValue.toString());
- }
+ static Function<ColumnFamilyDescriptor, String> blockSizeDetails =
familyDescriptor -> String
+ .valueOf(familyDescriptor.getBlocksize());
/**
- * Serialize column family to bloom type map to configuration.
- * Invoked while configuring the MR job for incremental load.
- * @param tableDescriptor to read the properties from
- * @param conf to persist serialized values into
+ * Serialize column family to bloom type map to configuration. Invoked
while
+ * configuring the MR job for incremental load.
+ *
+ * @param tableDescriptor
+ * to read the properties from
+ * @param conf
+ * to persist serialized values into
*
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
- static void configureBloomType(HTableDescriptor tableDescriptor,
Configuration conf)
- throws UnsupportedEncodingException {
- if (tableDescriptor == null) {
- // could happen with mock table instance
- return;
- }
- StringBuilder bloomTypeConfigValue = new StringBuilder();
- Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- bloomTypeConfigValue.append('&');
- }
-
bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(),
"UTF-8"));
- bloomTypeConfigValue.append('=');
- String bloomType =
familyDescriptor.getBloomFilterType().toString();
- if (bloomType == null) {
- bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER;
- }
- bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8"));
+ static Function<ColumnFamilyDescriptor, String> bloomTypeDetails =
familyDescriptor -> {
+ String bloomType = familyDescriptor.getBloomFilterType().toString();
+ if (bloomType == null) {
+ bloomType =
ColumnFamilyDescriptorBuilder.DEFAULT_BLOOMFILTER.name();
}
- conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY,
bloomTypeConfigValue.toString());
- }
+ return bloomType;
+ };
/**
* Serialize column family to data block encoding map to configuration.
* Invoked while configuring the MR job for incremental load.
*
- * @param table to read the properties from
- * @param conf to persist serialized values into
+ * @param tableDescriptor
+ * to read the properties from
+ * @param conf
+ * to persist serialized values into
* @throws IOException
* on failure to read column family descriptors
*/
@VisibleForTesting
- static void configureDataBlockEncoding(HTableDescriptor tableDescriptor,
Configuration conf)
- throws UnsupportedEncodingException {
- if (tableDescriptor == null) {
- // could happen with mock table instance
- return;
+ static Function<ColumnFamilyDescriptor, String> dataBlockEncodingDetails =
familyDescriptor -> {
+ DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding();
+ if (encoding == null) {
+ encoding = DataBlockEncoding.NONE;
}
- StringBuilder dataBlockEncodingConfigValue = new StringBuilder();
- Collection<HColumnDescriptor> families = tableDescriptor.getFamilies();
- int i = 0;
- for (HColumnDescriptor familyDescriptor : families) {
- if (i++ > 0) {
- dataBlockEncodingConfigValue.append('&');
- }
-
dataBlockEncodingConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(),
"UTF-8"));
- dataBlockEncodingConfigValue.append('=');
- DataBlockEncoding encoding =
familyDescriptor.getDataBlockEncoding();
- if (encoding == null) {
- encoding = DataBlockEncoding.NONE;
- }
-
dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(),
"UTF-8"));
+ return encoding.toString();
+ };
+
+ /**
+ * Copy from HBase's
org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat,
+ * so that it's protect function can be used.
+ */
+ final private static int validateCompositeKey(byte[] keyBytes) {
+
+ int separatorIdx = Bytes.indexOf(keyBytes, tableSeparator);
+
+ // Either the separator was not found or a tablename wasn't present or
a key wasn't present
+ if (separatorIdx == -1) {
+ throw new IllegalArgumentException("Invalid format for composite
key [" + Bytes
+ .toStringBinary(keyBytes) + "]. Cannot extract tablename
and suffix from key");
}
- conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
dataBlockEncodingConfigValue.toString());
+ return separatorIdx;
+ }
+
+ /**
+ * Copy from HBase's
org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat,
+ * so that it's protect function can be used.
+ */
+ protected static byte[] getTableName(byte[] keyBytes) {
+ int separatorIdx = validateCompositeKey(keyBytes);
+ return Bytes.copy(keyBytes, 0, separatorIdx);
}
+
+ /**
+ * Copy from HBase's
org.apache.hadoop.hbase.mapreduce.MultiTableHFileOutputFormat,
+ * so that it's protect function can be used.
+ */
+
+ protected static byte[] getSuffix(byte[] keyBytes) {
+ int separatorIdx = validateCompositeKey(keyBytes);
+ return Bytes.copy(keyBytes, separatorIdx+1, keyBytes.length -
separatorIdx - 1);
+ }
+
}