pradeepkth
Wed, 03 Feb 2010 16:59:34 -0800
Author: pradeepkth Date: Thu Feb 4 00:59:10 2010 New Revision: 906314 URL: http://svn.apache.org/viewvc?rev=906314&view=rev Log: PIG-1090: additional patch to handle calling setStoreSchema of StoreMetadata in local mode (pradeepkth)
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=906314&r1=906313&r2=906314&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
Thu Feb 4 00:59:10 2010
@@ -35,6 +35,7 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.pig.ExecType;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
@@ -282,6 +283,18 @@
for(Job job : succJobs){
List<POStore> sts = jcc.getStores(job);
for (POStore st: sts) {
+ // Currently (as of Feb 3 2010), hadoop's local mode does
not
+ // call cleanupJob on OutputCommitter (see
https://issues.apache.org/jira/browse/MAPREDUCE-1447)
+ // So to workaround that bug, we are calling
setStoreSchema on
+ // StoreFunc's which implement StoreMetadata here
+
/**********************************************************/
+ // NOTE: THE FOLLOWING IF SHOULD BE REMOVED ONCE
MAPREDUCE-1447
+ // IS FIXED - TestStore.testSetStoreSchema() should fail at
+ // that time and removing this code should fix it.
+
/**********************************************************/
+ if(pc.getExecType() == ExecType.LOCAL) {
+ storeSchema(job, st);
+ }
if (!st.isTmpStore()) {
succeededStores.add(st.getSFile());
finalStores++;
@@ -443,6 +456,19 @@
}
/**
+ * @param job
+ * @param st
+ * @throws IOException
+ */
+ private void storeSchema(Job job, POStore st) throws IOException {
+ JobContext jc = new JobContext(job.getJobConf(),
+ new org.apache.hadoop.mapreduce.JobID());
+ JobContext updatedJc = PigOutputCommitter.setUpContext(jc, st);
+ PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration());
+ }
+
+
+ /**
* An exception handler class to handle exceptions thrown by the job
controller thread
* Its a local class. This is the only mechanism to catch unhandled thread
exceptions
* Unhandled exceptions in threads are handled by the VM if the handler is
not registered
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=906314&r1=906313&r2=906314&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
(original)
+++
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
Thu Feb 4 00:59:10 2010
@@ -136,7 +136,7 @@
return contextCopy;
}
- private JobContext setUpContext(JobContext context,
+ static JobContext setUpContext(JobContext context,
POStore store) throws IOException {
// make a copy of the context so that the actions after this call
// do not end up updating the same context
@@ -153,7 +153,7 @@
return contextCopy;
}
- private void storeCleanup(POStore store, Configuration conf)
+ static void storeCleanup(POStore store, Configuration conf)
throws IOException {
StoreFunc storeFunc = store.getStoreFunc();
if (storeFunc instanceof StoreMetadata) {
Modified:
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=906314&r1=906313&r2=906314&view=diff
==============================================================================
---
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
(original)
+++
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
Thu Feb 4 00:59:10 2010
@@ -17,7 +17,6 @@
*/
package org.apache.pig.test;
-import java.io.File;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -25,17 +24,31 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
-import java.util.Map.Entry;
import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreMetadata;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
@@ -44,12 +57,6 @@
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import org.apache.pig.pen.physicalOperators.POCounter;
-import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -57,8 +64,10 @@
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.pen.physicalOperators.POCounter;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.tools.pigstats.PigStats;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -292,6 +301,110 @@
checkStorePath("/tmp/foo/../././","/tmp");
}
+ @Test
+ public void testSetStoreSchema() throws Exception {
+ PigServer ps = null;
+ String storeSchemaOutputFile = outputFileName + "_storeSchema_test";
+ try {
+ ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE,
ExecType.LOCAL};
+ String[] inputData = new String[]{"hello\tworld", "bye\tworld"};
+
+ String script = "a = load '"+ inputFileName + "';" +
+ "store a into '" + outputFileName + "' using " +
+ DummyStore.class.getName() + "();";
+
+ for (ExecType execType : modes) {
+ if(execType == ExecType.MAPREDUCE) {
+ ps = new PigServer(ExecType.MAPREDUCE,
+ cluster.getProperties());
+ Util.deleteFile(ps.getPigContext(), inputFileName);
+ Util.deleteFile(ps.getPigContext(), outputFileName);
+ Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
+ } else {
+ ps = new PigServer(ExecType.LOCAL);
+ Util.deleteFile(ps.getPigContext(), inputFileName);
+ Util.deleteFile(ps.getPigContext(), outputFileName);
+ Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
+ }
+ ps.setBatchOn();
+ Util.createInputFile(ps.getPigContext(),
+ inputFileName, inputData);
+ Util.registerMultiLineQuery(ps, script);
+ ps.executeBatch();
+ assertEquals(
+ "Checking if file indicating that storeSchema was " +
+ "called exists in " + execType + " mode", true,
+ Util.exists(ps.getPigContext(),
storeSchemaOutputFile));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Exception encountered - hence failing:" + e);
+ } finally {
+ Util.deleteFile(ps.getPigContext(), inputFileName);
+ Util.deleteFile(ps.getPigContext(), outputFileName);
+ Util.deleteFile(ps.getPigContext(), storeSchemaOutputFile);
+ }
+ }
+
+ public static class DummyStore implements StoreFunc, StoreMetadata{
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ // we don't really write in the test - so this is just to keep
+ // Pig/hadoop happy
+ return new TextOutputFormat<Long, Text>();
+ }
+
+ @Override
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+
+ }
+
+ @Override
+ public void putNext(Tuple t) throws IOException {
+ // we don't really write anything out
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path
curDir)
+ throws IOException {
+ return location;
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job)
+ throws IOException {
+ FileOutputFormat.setOutputPath(job, new Path(location));
+ }
+
+ @Override
+ public void storeSchema(ResourceSchema schema, String location,
+ Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ // create a file to test that this method got called - if it gets
called
+ // multiple times, the create will throw an Exception
+ fs.create(
+ new Path(conf.get("mapred.output.dir") +
"_storeSchema_test"),
+ false);
+ }
+
+ @Override
+ public void storeStatistics(ResourceStatistics stats, String location,
+ Configuration conf) throws IOException {
+ }
+
+ }
+
private void checkStorePath(String orig, String expected) throws Exception
{
checkStorePath(orig, expected, false);
}
Modified:
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java?rev=906314&r1=906313&r2=906314&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java
(original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/Util.java
Thu Feb 4 00:59:10 2010
@@ -308,6 +308,14 @@
fs.delete(new Path(fileName), true);
}
+ static public boolean exists(PigContext pigContext, String fileName)
+ throws IOException {
+ Configuration conf = ConfigurationUtil.toConfiguration(
+ pigContext.getProperties());
+ FileSystem fs = FileSystem.get(conf);
+ return fs.exists(new Path(fileName));
+ }
+
/**
* Helper function to check if the result of a Pig Query is in line with
* expected results.