Author: khorgath
Date: Thu Dec 8 22:35:28 2011
New Revision: 1212174
URL: http://svn.apache.org/viewvc?rev=1212174&view=rev
Log:
HCATALOG-171 HBaseBulkOutputStorageDriver should work with secured hadoop
(toffer via khorgath)
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Thu Dec 8 22:35:28 2011
@@ -81,6 +81,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-171. HBaseBulkOutputStorageDriver should work with secured hadoop
(toffer via khorgath)
+
HCAT-170. HBaseBulkOSD fails to launch ImportSequenceFile because of missing
jars in dist cache (toffer via khorgath)
HCAT-176. Class not found exception when running TestPigStorageDriver (daijy
via khorgath)
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
Thu Dec 8 22:35:28 2011
@@ -38,7 +38,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.UUID;
/**
* Base class share by both {@link HBaseBulkOutputStorageDriver} and {@link
HBaseDirectOutputStorageDriver}
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
Thu Dec 8 22:35:28 2011
@@ -5,7 +5,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -33,6 +36,12 @@ class HBaseBulkOutputFormat extends Outp
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
baseOutputFormat.checkOutputSpecs(context);
+ //Get jobTracker delegation token if security is enabled
+ //we need to launch the ImportSequenceFile job
+
if(context.getConfiguration().getBoolean("hadoop.security.authorization",false))
{
+ JobClient jobClient = new JobClient(new
JobConf(context.getConfiguration()));
+ context.getCredentials().addToken(new Text("my mr token"),
jobClient.getDelegationToken(null));
+ }
}
@Override
@@ -124,7 +133,7 @@ class HBaseBulkOutputFormat extends Outp
Configuration conf = jobContext.getConfiguration();
Path srcPath = FileOutputFormat.getOutputPath(jobContext);
Path destPath = new
Path(srcPath.getParent(),srcPath.getName()+"_hfiles");
- ImportSequenceFile.runJob(conf,
+ ImportSequenceFile.runJob(jobContext,
conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
srcPath,
destPath);
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
Thu Dec 8 22:35:28 2011
@@ -18,7 +18,10 @@
package org.apache.hcatalog.hbase;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -52,13 +55,27 @@ public class HBaseBulkOutputStorageDrive
//initialize() gets called multiple time in the lifecycle of an MR
job, client, mapper, reducer, etc
//depending on the case we have to make sure for some context
variables we set here that they don't get set again
if(!outputJobInfo.getProperties().containsKey(PROPERTY_INT_OUTPUT_LOCATION)) {
- String location = new
Path(context.getConfiguration().get(PROPERTY_TABLE_LOCATION),
+ String tableLocation =
context.getConfiguration().get(PROPERTY_TABLE_LOCATION);
+ String location = new Path(tableLocation,
"REVISION_"+outputJobInfo.getProperties()
.getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)).toString();
outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
location);
//We are writing out an intermediate sequenceFile hence location
is not passed in OutputJobInfo.getLocation()
//TODO replace this with a mapreduce constant when available
context.getConfiguration().set("mapred.output.dir", location);
+ //Temporary fix until support for secure hbase is available
+ //We need the intermediate directory to be world readable
+ //so that the hbase user can import the generated hfiles
+
if(context.getConfiguration().getBoolean("hadoop.security.authorization",false))
{
+ Path p = new Path(tableLocation);
+ FileSystem fs = FileSystem.get(context.getConfiguration());
+ fs.setPermission(new Path(tableLocation),
+ FsPermission.valueOf("drwx--x--x"));
+ while((p = p.getParent()) != null) {
+
if(!fs.getFileStatus(p).getPermission().getOtherAction().implies(FsAction.EXECUTE))
+ throw new IOException("Table's parent directories must
at least have global execute permissions.");
+ }
+ }
}
outputFormat = new HBaseBulkOutputFormat();
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
Thu Dec 8 22:35:28 2011
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
@@ -186,14 +187,15 @@ class ImportSequenceFile {
/**
* Method to run the Importer MapReduce Job. Normally will be called by
another MR job
- * during OutputCommitter.commitJob(). It wil inherit
- * @param parentConf configuration of the parent job
+ * during OutputCommitter.commitJob().
+ * @param parentContext JobContext of the parent job
* @param tableName name of table to bulk load data into
* @param InputDir path of SequenceFile formatted data to read
* @param scratchDir temporary path for the Importer MR job to build the
HFiles which will be imported
* @return
*/
- static boolean runJob(Configuration parentConf, String tableName, Path
InputDir, Path scratchDir) {
+ static boolean runJob(JobContext parentContext, String tableName, Path
InputDir, Path scratchDir) {
+ Configuration parentConf = parentContext.getConfiguration();
Configuration conf = new Configuration();
for(Map.Entry<String,String> el: parentConf) {
if(el.getKey().startsWith("hbase."))
@@ -206,6 +208,13 @@ class ImportSequenceFile {
conf.set("mapred.job.classpath.archives",parentConf.get("mapred.job.classpath.archives",
""));
conf.set("mapreduce.job.cache.archives.visibilities",parentConf.get("mapreduce.job.cache.archives.visibilities",""));
+ //Temporary fix until hbase security is ready
+ //We need the written HFile to be world readable so
+ //hbase regionserver user has the privileges to perform a hdfs move
+ if(parentConf.getBoolean("hadoop.security.authorization", false)) {
+ FsPermission.setUMask(conf, FsPermission.valueOf("----------"));
+ }
+
conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
boolean localMode = "local".equals(conf.get("mapred.job.tracker"));
@@ -218,6 +227,7 @@ class ImportSequenceFile {
throw new IOException("Importer work directory already exists:
"+workDir);
Job job = createSubmittableJob(conf, tableName, InputDir,
scratchDir, localMode);
job.setWorkingDirectory(workDir);
+ job.getCredentials().addAll(parentContext.getCredentials());
success = job.waitForCompletion(true);
fs.delete(workDir, true);
//We only cleanup on success because failure might've been caused
by existence of target directory
Modified:
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java?rev=1212174&r1=1212173&r2=1212174&view=diff
==============================================================================
---
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
(original)
+++
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Thu Dec 8 22:35:28 2011
@@ -1,5 +1,7 @@
package org.apache.hcatalog.hbase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -30,6 +32,7 @@ import org.apache.hcatalog.data.HCatReco
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
import org.junit.Test;
import java.io.IOException;
@@ -44,6 +47,8 @@ import static org.junit.Assert.assertTru
* Including ImprtSequenceFile, HBaseOutputStorageDrivers and
HBaseBulkOutputFormat
*/
public class TestHBaseBulkOutputStorageDriver extends SkeletonHBaseTest {
+ private final static Log LOG =
LogFactory.getLog(TestHBaseBulkOutputStorageDriver.class);
+
private final HiveConf allConf;
private final HCatDriver hcatDriver;
@@ -65,6 +70,7 @@ public class TestHBaseBulkOutputStorageD
}
public static class MapWrite extends Mapper<LongWritable, Text,
ImmutableBytesWritable, Put> {
+
@Override
public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
String vals[] = value.toString().split(",");
@@ -99,6 +105,7 @@ public class TestHBaseBulkOutputStorageD
public void hbaseBulkOutputFormatTest() throws IOException,
ClassNotFoundException, InterruptedException {
String testName = "hbaseBulkOutputFormatTest";
Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
@@ -120,15 +127,14 @@ public class TestHBaseBulkOutputStorageD
// input/output settings
Path inputPath = new Path(methodTestDir,"mr_input");
- getFileSystem().mkdirs(inputPath);
FSDataOutputStream os = getFileSystem().create(new
Path(inputPath,"inputFile.txt"));
for(String line: data)
os.write(Bytes.toBytes(line + "\n"));
os.close();
Path interPath = new Path(methodTestDir,"inter");
-
//create job
Job job = new Job(conf, testName);
+ HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapWrite.class);
@@ -174,6 +180,7 @@ public class TestHBaseBulkOutputStorageD
public void importSequenceFileTest() throws IOException,
ClassNotFoundException, InterruptedException {
String testName = "importSequenceFileTest";
Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
String tableName = newTableName(testName).toLowerCase();
byte[] tableNameBytes = Bytes.toBytes(tableName);
@@ -205,6 +212,7 @@ public class TestHBaseBulkOutputStorageD
//create job
Job job = new Job(conf, testName);
+ HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapWrite.class);
@@ -225,7 +233,9 @@ public class TestHBaseBulkOutputStorageD
assertTrue(job.waitForCompletion(true));
-
assertTrue(ImportSequenceFile.runJob(job.getConfiguration(),tableName,interPath,scratchPath));
+ job = new Job(new Configuration(allConf),testName+"_importer");
+ HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
+ assertTrue(ImportSequenceFile.runJob(job, tableName, interPath,
scratchPath));
//verify
HTable table = new HTable(conf, tableName);
@@ -252,6 +262,7 @@ public class TestHBaseBulkOutputStorageD
public void hbaseBulkOutputStorageDriverTest() throws Exception {
String testName = "hbaseBulkOutputStorageDriverTest";
Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
String databaseName = testName.toLowerCase();
String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
@@ -291,6 +302,7 @@ public class TestHBaseBulkOutputStorageD
//create job
Job job = new Job(conf,testName);
+ HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapHCatWrite.class);
@@ -355,6 +367,7 @@ public class TestHBaseBulkOutputStorageD
public void hbaseBulkOutputStorageDriverTestWithRevision() throws
Exception {
String testName = "hbaseBulkOutputStorageDriverTestWithRevision";
Path methodTestDir = new Path(getTestDir(),testName);
+ LOG.info("starting: "+testName);
String databaseName = testName.toLowerCase();
String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
@@ -394,6 +407,7 @@ public class TestHBaseBulkOutputStorageD
//create job
Job job = new Job(conf,testName);
+ HBaseHCatStorageHandler.addDependencyJars(job.getConfiguration());
job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
job.setJarByClass(this.getClass());
job.setMapperClass(MapHCatWrite.class);
@@ -459,7 +473,8 @@ public class TestHBaseBulkOutputStorageD
String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
"(key int, english string, spanish string)
STORED BY " +
"'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
- "TBLPROPERTIES
('hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
+ "TBLPROPERTIES (" +
+
"'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
@@ -520,3 +535,4 @@ public class TestHBaseBulkOutputStorageD
}
}
+