Author: travis
Date: Tue Sep 18 18:48:23 2012
New Revision: 1387314
URL: http://svn.apache.org/viewvc?rev=1387314&view=rev
Log:
HCATALOG-490 HCatStorer() throws error when the same partition key is present
in records in more than one tasks running as part of the same job
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Sep 18 18:48:23 2012
@@ -109,6 +109,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-490 HCatStorer() throws error when the same partition key is present in
records in more than one tasks running as part of the same job (amalakar via
traviscrawford)
+
HCAT-499 Multiple store commands does not work with Hadoop23 (rohinip via
avandana)
HCAT-501 HBase storage handler tests failing in trunk (traviscrawford)
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
Tue Sep 18 18:48:23 2012
@@ -195,8 +195,6 @@ class FileRecordWriterContainer extends
//create base OutputFormat
org.apache.hadoop.mapred.OutputFormat baseOF =
ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(),
currTaskContext.getJobConf());
- //check outputSpecs
- baseOF.checkOutputSpecs(null, currTaskContext.getJobConf());
//get Output Committer
org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter =
currTaskContext.getJobConf().getOutputCommitter();
//create currJobContext the latest so it gets all the config
changes
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatBaseTest.java
Tue Sep 18 18:48:23 2012
@@ -72,10 +72,10 @@ public class HCatBaseTest {
*/
protected void setUpHiveConf() {
hiveConf = new HiveConf(this.getClass());
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
"false");
- hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
TEST_WAREHOUSE_DIR);
+ hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,
TEST_WAREHOUSE_DIR);
}
protected void logAndRegister(PigServer server, String query) throws
IOException {
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
Tue Sep 18 18:48:23 2012
@@ -25,25 +25,20 @@ import java.util.List;
import java.util.Map;
import junit.framework.Assert;
-import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
@@ -55,22 +50,29 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertTrue;
+
/**
* Test for HCatOutputFormat. Writes a partition using HCatOutputFormat and
reads
* it back using HCatInputFormat, checks the column values and counts.
*/
-public abstract class HCatMapReduceTest extends TestCase {
+public abstract class HCatMapReduceTest extends HCatBaseTest {
private static final Logger LOG =
LoggerFactory.getLogger(HCatMapReduceTest.class);
- protected String dbName = "default";
- protected String tableName = "testHCatMapReduceTable";
+ protected static String dbName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+ protected static String tableName = "testHCatMapReduceTable";
protected String inputFormat = RCFileInputFormat.class.getName();
protected String outputFormat = RCFileOutputFormat.class.getName();
@@ -79,52 +81,30 @@ public abstract class HCatMapReduceTest
private static List<HCatRecord> writeRecords = new ArrayList<HCatRecord>();
private static List<HCatRecord> readRecords = new ArrayList<HCatRecord>();
- protected abstract void initialize() throws Exception;
-
protected abstract List<FieldSchema> getPartitionKeys();
protected abstract List<FieldSchema> getTableColumns();
- private HiveMetaStoreClient client;
- protected HiveConf hiveConf;
-
- private FileSystem fs;
- private String thriftUri = null;
-
- protected Driver driver;
-
- @Override
- protected void setUp() throws Exception {
- hiveConf = new HiveConf(this.getClass());
-
- //The default org.apache.hadoop.hive.ql.hooks.PreExecutePrinter hook
- //is present only in the ql/test directory
- hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
- hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname,
"false");
- driver = new Driver(hiveConf);
- SessionState.start(new CliSessionState(hiveConf));
-
- thriftUri = System.getenv("HCAT_METASTORE_URI");
-
- if (thriftUri != null) {
- LOG.info("Using URI {}", thriftUri);
-
- hiveConf.set("hive.metastore.local", "false");
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
- }
+ private static FileSystem fs;
+ @BeforeClass
+ public static void setUpOneTime() throws Exception {
fs = new LocalFileSystem();
fs.initialize(fs.getWorkingDirectory().toUri(), new Configuration());
- initialize();
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.setInt(HCatConstants.HCAT_HIVE_CLIENT_EXPIRY_TIME, 0);
+ // Hack to initialize cache with 0 expiry time causing it to return a
new hive client every time
+ // Otherwise the cache doesn't play well with the second test method
with the client gets closed() in the
+ // tearDown() of the previous test
+ HCatUtil.getHiveClient(hiveConf);
- client = new HiveMetaStoreClient(hiveConf, null);
- initTable();
+ MapCreate.writeCount = 0;
+ MapRead.readCount = 0;
}
- @Override
- protected void tearDown() throws Exception {
+ @After
+ public void deleteTable() throws Exception {
try {
String databaseName = (dbName == null) ?
MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
@@ -133,13 +113,10 @@ public abstract class HCatMapReduceTest
e.printStackTrace();
throw e;
}
-
- client.close();
}
-
- private void initTable() throws Exception {
-
+ @Before
+ public void createTable() throws Exception {
String databaseName = (dbName == null) ?
MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
try {
@@ -237,6 +214,23 @@ public abstract class HCatMapReduceTest
Job runMRCreate(Map<String, String> partitionValues,
List<HCatFieldSchema> partitionColumns, List<HCatRecord>
records,
int writeCount, boolean assertWrite) throws Exception {
+ return runMRCreate(partitionValues, partitionColumns, records,
writeCount, assertWrite, true);
+ }
+
+ /**
+ * Run a local map reduce job to load data from in memory records to an
HCatalog Table
+ * @param partitionValues
+ * @param partitionColumns
+ * @param records data to be written to HCatalog table
+ * @param writeCount
+ * @param assertWrite
+ * @param asSingleMapTask
+ * @return
+ * @throws Exception
+ */
+ Job runMRCreate(Map<String, String> partitionValues,
+ List<HCatFieldSchema> partitionColumns, List<HCatRecord>
records,
+ int writeCount, boolean assertWrite, boolean
asSingleMapTask) throws Exception {
writeRecords = records;
MapCreate.writeCount = 0;
@@ -249,10 +243,22 @@ public abstract class HCatMapReduceTest
// input/output settings
job.setInputFormatClass(TextInputFormat.class);
- Path path = new Path(fs.getWorkingDirectory(),
"mapred/testHCatMapReduceInput");
- createInputFile(path, writeCount);
+ if (asSingleMapTask) {
+ // One input path would mean only one map task
+ Path path = new Path(fs.getWorkingDirectory(),
"mapred/testHCatMapReduceInput");
+ createInputFile(path, writeCount);
+ TextInputFormat.setInputPaths(job, path);
+ } else {
+ // Create two input paths so that two map tasks get triggered.
There could be other ways
+ // to trigger two map tasks.
+ Path path = new Path(fs.getWorkingDirectory(),
"mapred/testHCatMapReduceInput");
+ createInputFile(path, writeCount / 2);
- TextInputFormat.setInputPaths(job, path);
+ Path path2 = new Path(fs.getWorkingDirectory(),
"mapred/testHCatMapReduceInput2");
+ createInputFile(path2, (writeCount - writeCount / 2));
+
+ TextInputFormat.setInputPaths(job, path, path2);
+ }
job.setOutputFormatClass(HCatOutputFormat.class);
@@ -294,6 +300,13 @@ public abstract class HCatMapReduceTest
return runMRRead(readCount, null);
}
+ /**
+ * Run a local map reduce job to read records from HCatalog table and
verify if the count is as expected
+ * @param readCount
+ * @param filter
+ * @return
+ * @throws Exception
+ */
List<HCatRecord> runMRRead(int readCount, String filter) throws Exception {
MapRead.readCount = 0;
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
Tue Sep 18 18:48:23 2012
@@ -34,31 +34,37 @@ import org.apache.hcatalog.data.DefaultH
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
public class TestHCatDynamicPartitioned extends HCatMapReduceTest {
- private List<HCatRecord> writeRecords;
- private List<HCatFieldSchema> dataColumns;
+ private static List<HCatRecord> writeRecords;
+ private static List<HCatFieldSchema> dataColumns;
private static final Logger LOG =
LoggerFactory.getLogger(TestHCatDynamicPartitioned.class);
+ private static final int NUM_RECORDS = 20;
+ private static final int NUM_PARTITIONS = 5;
- @Override
- protected void initialize() throws Exception {
-
+ @BeforeClass
+ public static void generateInputData() throws Exception {
tableName = "testHCatDynamicPartitionedTable";
- generateWriteRecords(20, 5, 0);
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
generateDataColumns();
}
- private void generateDataColumns() throws HCatException {
+ private static void generateDataColumns() throws HCatException {
dataColumns = new ArrayList<HCatFieldSchema>();
dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("c2", Constants.STRING_TYPE_NAME, "")));
dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("p1", Constants.STRING_TYPE_NAME, "")));
}
- private void generateWriteRecords(int max, int mod, int offset) {
+ private static void generateWriteRecords(int max, int mod, int offset) {
writeRecords = new ArrayList<HCatRecord>();
for (int i = 0; i < max; i++) {
@@ -86,13 +92,29 @@ public class TestHCatDynamicPartitioned
return fields;
}
-
+ /**
+ * Run the dynamic partitioning test but with single map task
+ * @throws Exception
+ */
+ @Test
public void testHCatDynamicPartitionedTable() throws Exception {
+ runHCatDynamicPartitionedTable(true);
+ }
+
+ /**
+ * Run the dynamic partitioning test but with multiple map task. See
HCATALOG-490
+ * @throws Exception
+ */
+ @Test
+ public void testHCatDynamicPartitionedTableMultipleTask() throws Exception
{
+ runHCatDynamicPartitionedTable(false);
+ }
- generateWriteRecords(20, 5, 0);
- runMRCreate(null, dataColumns, writeRecords, 20, true);
+ protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask)
throws Exception {
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true,
asSingleMapTask);
- runMRRead(20);
+ runMRRead(NUM_RECORDS);
//Read with partition filter
runMRRead(4, "p1 = \"0\"");
@@ -110,14 +132,14 @@ public class TestHCatDynamicPartitioned
ArrayList<String> res = new ArrayList<String>();
driver.getResults(res);
- assertEquals(20, res.size());
+ assertEquals(NUM_RECORDS, res.size());
//Test for duplicate publish
IOException exc = null;
try {
- generateWriteRecords(20, 5, 0);
- Job job = runMRCreate(null, dataColumns, writeRecords, 20, false);
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ Job job = runMRCreate(null, dataColumns, writeRecords,
NUM_RECORDS, false);
if (HcatTestUtils.isHadoop23()) {
new FileOutputCommitterContainer(job, null).cleanupJob(job);
}
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java
Tue Sep 18 18:48:23 2012
@@ -32,14 +32,19 @@ import org.apache.hcatalog.data.DefaultH
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class TestHCatNonPartitioned extends HCatMapReduceTest {
- private List<HCatRecord> writeRecords;
- List<HCatFieldSchema> partitionColumns;
+ private static List<HCatRecord> writeRecords;
+ static List<HCatFieldSchema> partitionColumns;
- @Override
- protected void initialize() throws HCatException {
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
dbName = null; //test if null dbName works ("default" is used)
tableName = "testHCatNonPartitionedTable";
@@ -75,6 +80,7 @@ public class TestHCatNonPartitioned exte
}
+ @Test
public void testHCatNonPartitionedTable() throws Exception {
Map<String, String> partitionMap = new HashMap<String, String>();
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1387314&r1=1387313&r2=1387314&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
Tue Sep 18 18:48:23 2012
@@ -33,14 +33,19 @@ import org.apache.hcatalog.data.HCatReco
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class TestHCatPartitioned extends HCatMapReduceTest {
- private List<HCatRecord> writeRecords;
- private List<HCatFieldSchema> partitionColumns;
+ private static List<HCatRecord> writeRecords;
+ private static List<HCatFieldSchema> partitionColumns;
- @Override
- protected void initialize() throws Exception {
+ @BeforeClass
+ public static void oneTimeSetUp() throws Exception {
tableName = "testHCatPartitionedTable";
writeRecords = new ArrayList<HCatRecord>();
@@ -77,6 +82,7 @@ public class TestHCatPartitioned extends
}
+ @Test
public void testHCatPartitionedTable() throws Exception {
Map<String, String> partitionMap = new HashMap<String, String>();