Modified: pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java Thu Nov 27 12:49:54 2014 @@ -187,19 +187,18 @@ public abstract class DefaultAbstractBag private long totalSizeFromAvgTupleSize(long avgTupleSize, int numInMem) { long used = avgTupleSize * numInMem; - // add up the overhead for this object and other object variables - int bag_fix_size = 8 /* object header */ - + 4 + 8 + 8 /* mLastContentsSize + mMemSize + mSize */ - + 8 + 8 /* mContents ref + mSpillFiles ref*/ - + 4 /* +4 to round it to eight*/ - + 36 /* mContents fixed */ - ; long mFields_size = roundToEight(4 + numInMem*4); /* mContents fixed + per entry */ // in java hotspot 32bit vm, there seems to be a minimum bag size of 188 bytes // some of the extra bytes is probably from a minimum size of this array list mFields_size = Math.max(40, mFields_size); - used += bag_fix_size + mFields_size; + // the fixed overhead for this object and other object variables = 84 bytes + // 8 - object header + // 4 + 8 + 8 - sampled + aggSampleTupleSize + mSize + // 8 + 8 - mContents ref + mSpillFiles ref + // 4 - spillableRegistered +4 instead of 1 to round it to eight + // 36 - mContents fixed + used += 84 + mFields_size; // add up overhead for mSpillFiles ArrayList, Object[] inside ArrayList, // object variable inside ArrayList and references to spill files @@ -444,7 +443,7 @@ public abstract class DefaultAbstractBag if (reporter != null && reporter.getCounter(counter)!=null) { reporter.getCounter(counter).increment(numRecsSpilled); } else { - PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter); + PigHadoopLogger.getInstance().warn(mContents, "Spill counter incremented", counter); } }
Modified: pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java Thu Nov 27 12:49:54 2014 @@ -45,7 +45,6 @@ import org.apache.pig.impl.util.ObjectSe */ public class DefaultTuple extends AbstractTuple { - protected boolean isNull = false; private static final long serialVersionUID = 2L; protected List<Object> mFields; @@ -165,11 +164,6 @@ public class DefaultTuple extends Abstra @Override public long getMemorySize() { Iterator<Object> i = mFields.iterator(); - // fixed overhead - long empty_tuple_size = 8 /* tuple object header */ - + 8 /* isNull - but rounded to 8 bytes as total obj size needs to be multiple of 8 */ - + 8 /* mFields reference */ - + 32 /* mFields array list fixed size */; // rest of the fixed portion of mfields size is accounted within empty_tuple_size long mfields_var_size = SizeUtil.roundToEight(4 + 4 * mFields.size()); @@ -177,7 +171,11 @@ public class DefaultTuple extends Abstra // which is probably from the minimum size of this array list mfields_var_size = Math.max(40, mfields_var_size); - long sum = empty_tuple_size + mfields_var_size; + // fixed overhead = 48 bytes + //8 - tuple object header + //8 - mFields reference + //32 - mFields array list fixed size + long sum = 48 + mfields_var_size; while (i.hasNext()) { sum += SizeUtil.getPigObjMemSize(i.next()); } Modified: pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java Thu Nov 27 12:49:54 2014 @@ -516,19 +516,27 @@ public class DistinctDataBag extends Def // the spill files list. So I need to append it to my // linked list as well so that it's still there when I // move my linked list back to the spill files. + DataOutputStream out = null; try { - DataOutputStream out = getSpillFile(); + out = getSpillFile(); ll.add(mSpillFiles.get(mSpillFiles.size() - 1)); Tuple t; while ((t = readFromTree()) != null) { t.write(out); } out.flush(); - out.close(); } catch (IOException ioe) { String msg = "Unable to find our spill file."; log.fatal(msg, ioe); throw new RuntimeException(msg, ioe); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e); + } + } } } // delete files that have been merged into new files Modified: pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java Thu Nov 27 12:49:54 2014 @@ -36,6 +36,7 @@ import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.PigConfiguration; +import org.apache.pig.PigWarning; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; @@ -81,7 +82,7 @@ public class InternalDistinctBag extends if (percent < 0) { percent = 0.2F; if (PigMapReduce.sJobConfInternal.get() != null) { - String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE); + String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_MEMUSAGE); if (usage != null) { percent = Float.parseFloat(usage); } @@ -424,19 +425,25 @@ public class InternalDistinctBag extends // the spill files list. So I need to append it to my // linked list as well so that it's still there when I // move my linked list back to the spill files. + DataOutputStream out = null; try { - DataOutputStream out = getSpillFile(); + out = getSpillFile(); ll.add(mSpillFiles.get(mSpillFiles.size() - 1)); Tuple t; while ((t = readFromTree()) != null) { t.write(out); } out.flush(); - out.close(); } catch (IOException ioe) { String msg = "Unable to find our spill file."; log.fatal(msg, ioe); throw new RuntimeException(msg, ioe); + } finally { + try { + out.close(); + } catch (IOException e) { + warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e); + } } } Modified: pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java Thu Nov 27 12:49:54 2014 @@ -35,6 +35,7 @@ import java.util.PriorityQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigWarning; /** @@ -401,19 +402,27 @@ public class InternalSortedBag extends S // the spill files list. So I need to append it to my // linked list as well so that it's still there when I // move my linked list back to the spill files. + DataOutputStream out = null; try { - DataOutputStream out = getSpillFile(); + out = getSpillFile(); ll.add(mSpillFiles.get(mSpillFiles.size() - 1)); Tuple t; while ((t = readFromPriorityQ()) != null) { t.write(out); } out.flush(); - out.close(); } catch (IOException ioe) { String msg = "Unable to find our spill file."; log.fatal(msg, ioe); throw new RuntimeException(msg, ioe); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e); + } + } } } // delete files that have been merged into new files Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java Thu Nov 27 12:49:54 2014 @@ -17,7 +17,7 @@ */ package org.apache.pig.data; -import static org.apache.pig.PigConfiguration.SHOULD_USE_SCHEMA_TUPLE; +import static org.apache.pig.PigConfiguration.PIG_SCHEMA_TUPLE_ENABLED; import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT; import java.io.File; @@ -33,7 +33,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.pig.ExecType; import org.apache.pig.PigConstants; import org.apache.pig.data.SchemaTupleClassGenerator.GenContext; import org.apache.pig.data.utils.StructuresHelper.SchemaKey; @@ -151,8 +150,8 @@ public class SchemaTupleBackend { return; } // Step one is to see if there are any classes in the distributed cache - if (!jConf.getBoolean(SHOULD_USE_SCHEMA_TUPLE, SCHEMA_TUPLE_ON_BY_DEFAULT)) { - LOG.info("Key [" + SHOULD_USE_SCHEMA_TUPLE +"] was not set... will not generate code."); + if (!jConf.getBoolean(PIG_SCHEMA_TUPLE_ENABLED, SCHEMA_TUPLE_ON_BY_DEFAULT)) { + LOG.info("Key [" + PIG_SCHEMA_TUPLE_ENABLED +"] was not set... will not generate code."); return; } // Step two is to copy everything from the distributed cache if we are in distributed mode @@ -184,14 +183,22 @@ public class SchemaTupleBackend { LOG.info("Attempting to read file: " + s); // The string is the symlink into the distributed cache File src = new File(s); - FileInputStream fin = new FileInputStream(src); - FileOutputStream fos = new FileOutputStream(new File(codeDir, s)); - - fin.getChannel().transferTo(0, src.length(), fos.getChannel()); + FileInputStream fin = null; + FileOutputStream fos = null; + try { + fin = new FileInputStream(src); + fos = new FileOutputStream(new File(codeDir, s)); - fin.close(); - fos.close(); - LOG.info("Successfully copied file to local directory."); + fin.getChannel().transferTo(0, src.length(), fos.getChannel()); + LOG.info("Successfully copied file to local directory."); + } finally { + if (fin != null) { + fin.close(); + } + if (fos != null) { + fos.close(); + } + } } } Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java Thu Nov 27 12:49:54 2014 @@ -61,27 +61,27 @@ public class SchemaTupleClassGenerator { * This context is used in UDF code. Currently, this is only used for * the inputs to UDF's. */ - UDF (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_UDF, true, GenerateUdf.class), + UDF (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_UDF, true, GenerateUdf.class), /** * This context is for POForEach. This will use the expected output of a ForEach * to return a typed Tuple. */ - FOREACH (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_FOREACH, true, GenerateForeach.class), + FOREACH (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_FOREACH, true, GenerateForeach.class), /** * This context controls whether or not SchemaTuples will be used in FR joins. * Currently, they will be used in the HashMap that FR Joins construct. */ - FR_JOIN (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_FRJOIN, true, GenerateFrJoin.class), + FR_JOIN (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_FRJOIN, true, GenerateFrJoin.class), /** * This context controls whether or not SchemaTuples will be used in merge joins. */ - MERGE_JOIN (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_MERGEJOIN, true, GenerateMergeJoin.class), + MERGE_JOIN (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_MERGEJOIN, true, GenerateMergeJoin.class), /** * All registered Schemas will also be registered in one additional context. * This context will allow users to "force" the load of a SchemaTupleFactory * if one is present in any context. */ - FORCE_LOAD (PigConfiguration.SCHEMA_TUPLE_SHOULD_ALLOW_FORCE, true, GenerateForceLoad.class); + FORCE_LOAD (PigConfiguration.PIG_SCHEMA_TUPLE_ALLOW_FORCE, true, GenerateForceLoad.class); /** * These annotations are used to mark a given SchemaTuple with @@ -226,7 +226,7 @@ public class SchemaTupleClassGenerator { */ //TODO in the future, we can use ASM to generate the bytecode directly. private static void compileCodeString(String className, String generatedCodeString, File codeDir) { - JavaCompilerHelper compiler = new JavaCompilerHelper(); + JavaCompilerHelper compiler = new JavaCompilerHelper(); String tempDir = codeDir.getAbsolutePath(); compiler.addToClassPath(tempDir); LOG.debug("Compiling SchemaTuple code with classpath: " + compiler.getClassPath()); @@ -242,12 +242,14 @@ public class SchemaTupleClassGenerator { this.id = id; } + @Override public void prepare() { add("@Override"); add("protected int generatedCodeCompareToSpecific(SchemaTuple_"+id+" t) {"); add(" int i = 0;"); } + @Override public void process(int fieldNum, Schema.FieldSchema fs) { add(" i = compare(checkIfNull_" + fieldNum + "(), getPos_" + fieldNum + "(), t.checkIfNull_" + fieldNum + "(), t.getPos_" @@ -257,6 +259,7 @@ public class SchemaTupleClassGenerator { add(" }"); } + @Override public void end() { add(" return i;"); add("}"); @@ -271,6 +274,7 @@ public class SchemaTupleClassGenerator { this.id = id; } + @Override public void prepare() { add("@Override"); add("protected int generatedCodeCompareTo(SchemaTuple t, boolean checkType) {"); @@ -282,6 +286,7 @@ public class SchemaTupleClassGenerator { boolean compIsNull = false; boolean compByte = false; + @Override public void process(int fieldNum, Schema.FieldSchema fs) { add(" i = compareWithElementAtPos(checkIfNull_" + fieldNum + "(), getPos_" + fieldNum + "(), t, " + fieldNum + ");"); add(" if (i != 0) {"); @@ -289,6 +294,7 @@ public class SchemaTupleClassGenerator { add(" }"); } + @Override public void end() { add(" return 0;"); add("}"); @@ -296,16 +302,19 @@ public class SchemaTupleClassGenerator { } static class HashCode extends TypeInFunctionStringOut { + @Override public void prepare() { add("@Override"); add("public int generatedCodeHashCode() {"); add(" int h = 17;"); } + @Override public void process(int fieldPos, Schema.FieldSchema fs) { add(" h = hashCodePiece(h, getPos_" + fieldPos + "(), checkIfNull_" + fieldPos + "());"); } + @Override public void end() { add(" return h;"); add("}"); @@ -323,6 +332,7 @@ public class SchemaTupleClassGenerator { private int booleans = 0; private File codeDir; + @Override public void prepare() { String s; try { @@ -333,6 +343,7 @@ public class SchemaTupleClassGenerator { add("private static Schema schema = staticSchemaGen(\"" + s + "\");"); } + @Override public void process(int fieldPos, Schema.FieldSchema fs) { if (!isTuple()) { if (isPrimitive() && (primitives++ % 8 == 0)) { @@ -385,6 +396,7 @@ public class SchemaTupleClassGenerator { private int byteField = 0; //this is for setting booleans private int byteIncr = 0; //this is for counting the booleans we've encountered + @Override public void process(int fieldPos, Schema.FieldSchema fs) { if (!isTuple()) { add("public void setPos_"+fieldPos+"("+typeName()+" v) {"); @@ -433,27 +445,32 @@ public class SchemaTupleClassGenerator { } static class ListSetString extends TypeInFunctionStringOut { + @Override public void prepare() { add("@Override"); add("public void generatedCodeSetIterator(Iterator<Object> it) throws ExecException {"); } + @Override public void process(int fieldPos, Schema.FieldSchema fs) { add(" setPos_"+fieldPos+"(unbox(it.next(), getDummy_"+fieldPos+"()));"); } + @Override public void end() { add("}"); } } static class GenericSetString extends TypeInFunctionStringOut { + @Override public void prepare() { add("@Override"); add("public void generatedCodeSetField(int fieldNum, Object val) throws ExecException {"); add(" switch (fieldNum) {"); } + @Override public void process(int fieldPos, Schema.FieldSchema fs) { add(" case ("+fieldPos+"):"); add(" if (val == null) {"); @@ -464,6 +481,7 @@ public class SchemaTupleClassGenerator { add(" break;"); } + @Override public void end() { add(" default:"); add(" throw new ExecException(\"Invalid index given to set: \" + fieldNum);"); @@ -473,16 +491,19 @@ public class SchemaTupleClassGenerator { } static class GenericGetString extends TypeInFunctionStringOut { + @Override public void prepare() { add("@Override"); add("public Object generatedCodeGetField(int fieldNum) throws ExecException {"); add(" switch (fieldNum) {"); } + @Override public void process(int fieldPos, Schema.FieldSchema fs) { add(" case ("+fieldPos+"): return checkIfNull_"+fieldPos+"() ? null : box(getPos_"+fieldPos+"());"); } + @Override public void end() { add(" default: throw new ExecException(\"Invalid index given to get: \" + fieldNum);"); add(" }"); @@ -491,16 +512,19 @@ public class SchemaTupleClassGenerator { } static class GeneralIsNullString extends TypeInFunctionStringOut { + @Override public void prepare() { add("@Override"); add("public boolean isGeneratedCodeFieldNull(int fieldNum) throws ExecException {"); add(" switch (fieldNum) {"); } + @Override public void process(int fieldPos, Schema.FieldSchema fs) { add(" case ("+fieldPos+"): return checkIfNull_"+fieldPos+"();"); } + @Override public void end() { add(" default: throw new ExecException(\"Invalid index given: \" + fieldNum);"); add(" }"); @@ -512,6 +536,7 @@ public class SchemaTupleClassGenerator { private int nullByte = 0; //the byte_ val private int byteIncr = 0; //the mask we're on + @Override public void process(int fieldPos, Schema.FieldSchema fs) { add("public boolean checkIfNull_" + fieldPos + "() {"); if (isPrimitive()) { @@ -532,6 +557,7 @@ public class SchemaTupleClassGenerator { private int nullByte = 0; //the byte_ val private int byteIncr = 0; //the mask we're on + @Override public void process(int fieldPos, Schema.FieldSchema fs) { add("public void setNull_"+fieldPos+"(boolean b) {"); if (isPrimitive()) { @@ -554,11 +580,13 @@ public class SchemaTupleClassGenerator { static class SetEqualToSchemaTupleSpecificString extends TypeInFunctionStringOut { private int id; + @Override public void prepare() { add("@Override"); add("protected SchemaTuple generatedCodeSetSpecific(SchemaTuple_"+id+" t) {"); } + @Override public void process(int fieldPos, Schema.FieldSchema fs) { add(" if (t.checkIfNull_" + fieldPos + "()) {"); add(" setNull_" + fieldPos + "(true);"); @@ -568,6 +596,7 @@ public class SchemaTupleClassGenerator { addBreak(); } + @Override public void end() { add(" return this;"); add("}"); @@ -586,6 +615,7 @@ public class SchemaTupleClassGenerator { this.id = id; } + @Override public void prepare() { add("@Override"); add("public boolean isSpecificSchemaTuple(Object o) {"); @@ -599,15 +629,18 @@ public class SchemaTupleClassGenerator { static class WriteNullsString extends TypeInFunctionStringOut { String s = " boolean[] b = {\n"; + @Override public void prepare() { add("@Override"); add("protected boolean[] generatedCodeNullsArray() throws IOException {"); } + @Override public void process(int fieldPos, Schema.FieldSchema fs) { s += " checkIfNull_"+fieldPos+"(),\n"; } + @Override public void end() { s = s.substring(0, s.length() - 2) + "\n };"; add(s); @@ -626,11 +659,13 @@ public class SchemaTupleClassGenerator { private int booleans = 0; + @Override public void prepare() { add("@Override"); add("protected void generatedCodeReadFields(DataInput in, boolean[] b) throws IOException {"); } + @Override public void process(int fieldPos, Schema.FieldSchema fs) { if (isBoolean()) { booleans++; @@ -659,6 +694,7 @@ public class SchemaTupleClassGenerator { } } + @Override public void end() { if (booleans > 0) { int i = 0; @@ -679,6 +715,7 @@ public class SchemaTupleClassGenerator { static class WriteString extends TypeInFunctionStringOut { + @Override public void prepare() { add("@Override"); add("protected void generatedCodeWriteElements(DataOutput out) throws IOException {"); @@ -686,6 +723,7 @@ public class SchemaTupleClassGenerator { private int booleans = 0; + @Override public void process(int fieldPos, Schema.FieldSchema fs) { if (isBoolean()) { booleans++; @@ -697,6 +735,7 @@ public class SchemaTupleClassGenerator { } } + @Override public void end() { if (booleans > 0) { int i = 0; @@ -716,6 +755,7 @@ public class SchemaTupleClassGenerator { String s = " return SizeUtil.roundToEight("; + @Override public void prepare() { add("@Override"); add("public long getGeneratedCodeMemorySize() {"); @@ -725,6 +765,7 @@ public class SchemaTupleClassGenerator { private int primitives = 0; //TODO a null array or object variable still takes up space for the pointer, yes? + @Override public void process(int fieldPos, Schema.FieldSchema fs) { if (isInt() || isFloat()) { size += 4; @@ -757,6 +798,7 @@ public class SchemaTupleClassGenerator { } } + @Override public void end() { s += size + ");"; add(s); @@ -766,6 +808,7 @@ public class SchemaTupleClassGenerator { } static class GetDummyString extends TypeInFunctionStringOut { + @Override public void process(int fieldPos, Schema.FieldSchema fs) { add("public "+typeName()+" getDummy_"+fieldPos+"() {"); switch (fs.type) { @@ -795,6 +838,7 @@ public class SchemaTupleClassGenerator { private int booleanByte = 0; private int booleans; + @Override public void process(int fieldPos, Schema.FieldSchema fs) { if (!isTuple()) { add("public "+typeName()+" getPos_"+fieldPos+"() {"); @@ -823,6 +867,7 @@ public class SchemaTupleClassGenerator { static class GetSchemaTupleIdentifierString extends TypeInFunctionStringOut { private int id; + @Override public void end() { add("@Override"); add("public int getSchemaTupleIdentifier() {"); @@ -839,10 +884,12 @@ public class SchemaTupleClassGenerator { static class SchemaSizeString extends TypeInFunctionStringOut { int i = 0; + @Override public void process(int fieldNum, Schema.FieldSchema fS) { i++; } + @Override public void end() { add("@Override"); add("protected int schemaSize() {"); @@ -855,10 +902,12 @@ public class SchemaTupleClassGenerator { static class SizeString extends TypeInFunctionStringOut { int i = 0; + @Override public void process(int fieldNum, Schema.FieldSchema fS) { i++; } + @Override public void end() { add("@Override"); add("protected int generatedCodeSize() {"); @@ -873,16 +922,19 @@ public class SchemaTupleClassGenerator { } static class GetTypeString extends TypeInFunctionStringOut { + @Override public void prepare() { add("@Override"); add("public byte getGeneratedCodeFieldType(int fieldNum) throws ExecException {"); add(" switch (fieldNum) {"); } + @Override public void process(int fieldNum, Schema.FieldSchema fs) { add(" case ("+fieldNum+"): return "+fs.type+";"); } + @Override public void end() { add(" default: throw new ExecException(\"Invalid index given: \" + fieldNum);"); add(" }"); @@ -898,6 +950,7 @@ public class SchemaTupleClassGenerator { this.id = id; } + @Override public void prepare() { add("@Override"); add("protected SchemaTuple generatedCodeSet(SchemaTuple t, boolean checkClass) throws ExecException {"); @@ -913,6 +966,7 @@ public class SchemaTupleClassGenerator { addBreak(); } + @Override public void process(int fieldNum, Schema.FieldSchema fs) { add(" if ("+fs.type+" != theirFS.get("+fieldNum+").type) {"); add(" throw new ExecException(\"Given SchemaTuple does not match current in field " + fieldNum + ". Expected type: " + fs.type + ", found: \" + theirFS.get("+fieldNum+").type);"); @@ -929,6 +983,7 @@ public class SchemaTupleClassGenerator { addBreak(); } + @Override public void end() { add(" return this;"); add("}"); @@ -940,18 +995,21 @@ public class SchemaTupleClassGenerator { super(type); } + @Override public void prepare() { add("@Override"); add("protected "+name()+" generatedCodeGet"+properName()+"(int fieldNum) throws ExecException {"); add(" switch(fieldNum) {"); } + @Override public void process(int fieldNum, Schema.FieldSchema fs) { if (fs.type==thisType()) { add(" case ("+fieldNum+"): return returnUnlessNull(checkIfNull_"+fieldNum+"(), getPos_"+fieldNum+"());"); } } + @Override public void end() { add(" default:"); add(" return unbox"+properName()+"(getTypeAwareBase(fieldNum, \""+name()+"\"));"); @@ -979,17 +1037,20 @@ public class SchemaTupleClassGenerator { return proper(thisType()); } + @Override public void prepare() { add("@Override"); add("protected void generatedCodeSet"+properName()+"(int fieldNum, "+name()+" val) throws ExecException {"); add(" switch(fieldNum) {"); } + @Override public void process(int fieldNum, Schema.FieldSchema fs) { if (fs.type==thisType()) add(" case ("+fieldNum+"): setPos_"+fieldNum+"(val); break;"); } + @Override public void end() { add(" default: setTypeAwareBase(fieldNum, val, \""+name()+"\");"); add(" }"); Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java Thu Nov 27 12:49:54 2014 @@ -17,7 +17,7 @@ */ package org.apache.pig.data; -import static org.apache.pig.PigConfiguration.SHOULD_USE_SCHEMA_TUPLE; +import static org.apache.pig.PigConfiguration.PIG_SCHEMA_TUPLE_ENABLED; import static org.apache.pig.PigConstants.GENERATED_CLASSES_KEY; import static org.apache.pig.PigConstants.LOCAL_CODE_DIR; import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT; @@ -177,8 +177,8 @@ public class SchemaTupleFrontend { */ private boolean generateAll(Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> schemasToGenerate) { boolean filesToShip = false; - if (!conf.getBoolean(SHOULD_USE_SCHEMA_TUPLE, SCHEMA_TUPLE_ON_BY_DEFAULT)) { - LOG.info("Key ["+SHOULD_USE_SCHEMA_TUPLE+"] is false, will not generate code."); + if (!conf.getBoolean(PIG_SCHEMA_TUPLE_ENABLED, SCHEMA_TUPLE_ON_BY_DEFAULT)) { + LOG.info("Key ["+PIG_SCHEMA_TUPLE_ENABLED+"] is false, will not generate code."); return false; } LOG.info("Generating all registered Schemas."); Modified: pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java Thu Nov 27 12:49:54 2014 @@ -63,7 +63,7 @@ public abstract class SelfSpillBag exten maxMem = Runtime.getRuntime().maxMemory(); if (PigMapReduce.sJobConfInternal.get() != null) { String usage = PigMapReduce.sJobConfInternal.get().get( - PigConfiguration.PROP_CACHEDBAG_MEMUSAGE); + PigConfiguration.PIG_CACHEDBAG_MEMUSAGE); if (usage != null) { cachedMemUsage = Float.parseFloat(usage); } Modified: pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java Thu Nov 27 12:49:54 2014 @@ -491,19 +491,25 @@ public class SortedDataBag extends Defau // the spill files list. So I need to append it to my // linked list as well so that it's still there when I // move my linked list back to the spill files. + DataOutputStream out = null; try { - DataOutputStream out = getSpillFile(); + out = getSpillFile(); ll.add(mSpillFiles.get(mSpillFiles.size() - 1)); Tuple t; while ((t = readFromPriorityQ()) != null) { t.write(out); } out.flush(); - out.close(); } catch (IOException ioe) { String msg = "Unable to find our spill file."; log.fatal(msg, ioe); throw new RuntimeException(msg, ioe); + } finally { + try { + out.close(); + } catch (IOException e) { + warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e); + } } } // delete files that have been merged into new files Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Thu Nov 27 12:49:54 2014 @@ -48,13 +48,13 @@ import org.antlr.runtime.tree.Tree; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.Level; import org.apache.pig.ExecType; import org.apache.pig.ExecTypeProvider; import org.apache.pig.FuncSpec; -import org.apache.pig.Main; +import org.apache.pig.JVMReuseManager; import org.apache.pig.PigException; +import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.datastorage.DataStorageException; import org.apache.pig.backend.datastorage.ElementDescriptor; @@ -65,7 +65,6 @@ import org.apache.pig.backend.hadoop.dat import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.impl.streaming.ExecutableManager; import org.apache.pig.impl.streaming.StreamingCommand; -import org.apache.pig.impl.util.JarManager; import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor; import org.apache.pig.tools.parameters.ParseException; import org.apache.pig.tools.parameters.PreprocessorContext; @@ -105,7 +104,9 @@ public class PigContext implements Seria * Resources for the job (jars, scripting udf files, cached macro abstract syntax trees) */ - // extra jar files that are needed to run a job + // Jar files that are global to the whole Pig script, includes + // 1. registered jars + // 2. Jars defined in -Dpig.additional.jars transient public List<URL> extraJars = new LinkedList<URL>(); // original paths each extra jar came from @@ -115,10 +116,6 @@ public class PigContext implements Seria // jars needed for scripting udfs - jython.jar etc transient public List<String> scriptJars = new ArrayList<String>(2); - // jars that should not be merged in. - // (some functions may come from pig.jar and we don't want the whole jar file.) - transient public Vector<String> skipJars = new Vector<String>(2); - // jars that are predeployed to the cluster and thus should not be merged in at all (even subsets). transient public Vector<String> predeployedJars = new Vector<String>(2); @@ -174,6 +171,15 @@ public class PigContext implements Seria // List of paths skipped for automatic shipping List<String> skippedShipPaths = new ArrayList<String>(); + static { + JVMReuseManager.getInstance().registerForStaticDataCleanup(PigContext.class); + } + + @StaticDataCleanup + public static void staticDataCleanup() { + packageImportList.set(null); + } + /** * extends URLClassLoader to allow adding to classpath as new jars * are registered. @@ -260,13 +266,6 @@ public class PigContext implements Seria this.properties = properties; this.properties.setProperty("exectype", this.execType.name()); - String pigJar = JarManager.findContainingJar(Main.class); - String hadoopJar = JarManager.findContainingJar(FileSystem.class); - if (pigJar != null) { - addSkipJar(pigJar); - if (!pigJar.equals(hadoopJar)) - addSkipJar(hadoopJar); - } this.executionEngine = execType.getExecutionEngine(this); @@ -345,12 +344,6 @@ public class PigContext implements Seria } } - public void addSkipJar(String path) { - if (path != null && !skipJars.contains(path)) { - skipJars.add(path); - } - } - public void addJar(String path) throws MalformedURLException { if (path != null) { URL resource = (new File(path)).toURI().toURL(); @@ -409,6 +402,7 @@ public class PigContext implements Seria public String doParamSubstitution(BufferedReader reader) throws IOException { try { + preprocessorContext.setPigContext(this); preprocessorContext.loadParamVal(params, paramFiles); ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(preprocessorContext); Modified: pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java Thu Nov 27 12:49:54 2014 @@ -43,7 +43,24 @@ public class PigImplConstants { public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local"; /** + * Used by pig to indicate that current job has been converted to run in fetch mode + */ + public static final String CONVERTED_TO_FETCH = "pig.job.converted.fetch"; + + /** * Indicate the split index of the task. Used by merge cogroup */ public static final String PIG_SPLIT_INDEX = "pig.split.index"; + + /** + * Parallelism for the reducer + */ + public static final String REDUCER_DEFAULT_PARALLELISM = "pig.info.reducers.default.parallel"; + public static final String REDUCER_REQUESTED_PARALLELISM = "pig.info.reducers.requested.parallel"; + public static final String REDUCER_ESTIMATED_PARALLELISM = "pig.info.reducers.estimated.parallel"; + + /** + * Parallelism to be used for CROSS operation by GFCross UDF + */ + public static final String PIG_CROSS_PARALLELISM = "pig.cross.parallelism"; } Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java Thu Nov 27 12:49:54 2014 @@ -173,7 +173,7 @@ public class FindQuantiles extends EvalF } long numSamples = samples.size(); double toSkip = (double)numSamples / numQuantiles; - if(toSkip == 0) { + if(toSkip < 1) { // numSamples is < numQuantiles; // set numQuantiles to numSamples numQuantiles = (int)numSamples; Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java Thu Nov 27 12:49:54 2014 @@ -22,13 +22,12 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.pig.EvalFunc; -import org.apache.pig.PigConfiguration; import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.util.UDFContext; @@ -56,16 +55,19 @@ public class GFCross extends EvalFunc<Da parallelism = DEFAULT_PARALLELISM; Configuration cfg = UDFContext.getUDFContext().getJobConf(); if (cfg != null) { - String s = cfg.get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey); + String s = cfg.get(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey); if (s == null) { throw new IOException("Unable to get parallelism hint from job conf"); } parallelism = Integer.valueOf(s); + if (parallelism < 0) { + throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey + " was " + parallelism); + } } numInputs = (Integer)input.get(0); myNumber = (Integer)input.get(1); - + numGroupsPerInput = (int) Math.ceil(Math.pow(parallelism, 1.0/numInputs)); numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1); } @@ -73,21 +75,21 @@ public class GFCross extends EvalFunc<Da DataBag output = mBagFactory.newDefaultBag(); try{ - + int[] digits = new int[numInputs]; digits[myNumber] = r.nextInt(numGroupsPerInput); for (int i=0; i<numGroupsGoingTo; i++){ output.add(toTuple(digits)); next(digits); - } - + } + return output; }catch(ExecException e){ throw e; } } - + private Tuple toTuple(int[] digits) throws IOException, ExecException{ Tuple t = mTupleFactory.newTuple(numInputs); for (int i=0; i<numInputs; i++){ @@ -95,7 +97,7 @@ public class GFCross extends EvalFunc<Da } return t; } - + private void next(int[] digits){ for (int i=0; i<numInputs; i++){ if (i== myNumber) Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Thu Nov 27 12:49:54 2014 @@ -172,8 +172,8 @@ public class PoissonSampleLoader extends newSample = null; Configuration conf = split.getConf(); - sampleRate = conf.getInt(PigConfiguration.SAMPLE_RATE, DEFAULT_SAMPLE_RATE); - heapPerc = conf.getFloat(PigConfiguration.PERC_MEM_AVAIL, + sampleRate = conf.getInt(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, DEFAULT_SAMPLE_RATE); + heapPerc = conf.getFloat(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE, PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE); } Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java Thu Nov 27 12:49:54 2014 @@ -206,7 +206,7 @@ public class StreamingUDF extends EvalFu filePath.substring(0, lastSeparator - 1); command[UDF_NAME] = funcName; String fileCachePath = jobDir + filePath.substring(0, lastSeparator); - command[PATH_TO_FILE_CACHE] = "\"" + fileCachePath + "\""; + command[PATH_TO_FILE_CACHE] = "'" + fileCachePath + "'"; command[STD_OUT_OUTPUT_PATH] = outFileName; command[STD_ERR_OUTPUT_PATH] = errOutFileName; command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName; @@ -227,7 +227,8 @@ public class StreamingUDF extends EvalFu File userUdfFile = new File(fileCachePath + command[UDF_FILE_NAME] + getUserFileExtension()); if (!userUdfFile.exists()) { - String absolutePath = filePath.startsWith("/") ? filePath : File.separator + filePath; + String absolutePath = filePath.startsWith("/") ? filePath : "/" + filePath; + absolutePath = absolutePath.replaceAll(":", ""); String controllerDir = new File(command[PATH_TO_CONTROLLER_FILE]).getParent(); String userUdfPath = controllerDir + absolutePath + getUserFileExtension(); userUdfFile = new File(userUdfPath); Modified: pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java Thu Nov 27 12:49:54 2014 @@ -478,15 +478,15 @@ public class FileLocalizer { * since resourthPath should be available in the entire session * * @param pigContext - * @return + * @return temporary resource path * @throws DataStorageException */ - public static synchronized ContainerDescriptor getTemporaryResourcePath(final PigContext pigContext) + public static synchronized Path getTemporaryResourcePath(final PigContext pigContext) throws DataStorageException { if (resourcePath == null) { resourcePath = getTempContainer(pigContext); } - return resourcePath; + return ((HPath)resourcePath).getPath(); } private static synchronized ContainerDescriptor getTempContainer(final PigContext pigContext) @@ -787,6 +787,9 @@ public class FileLocalizer { boolean multipleFiles) throws IOException { Path path = new Path(filePath); + if (path.getName().isEmpty()) { + return new FetchFileRet[0]; + } URI uri = path.toUri(); Configuration conf = new Configuration(); ConfigurationUtil.mergeConf(conf, ConfigurationUtil.toConfiguration(properties)); @@ -800,7 +803,7 @@ public class FileLocalizer { && uri.getScheme() == null )|| // For Windows local files (uri.getScheme() == null && uri.getPath().matches("^/[A-Za-z]:.*")) || - (uri.getScheme() != null && uri.getScheme().equals("local")) + (uri.getScheme() != null && uri.getScheme().equals("local")) ) { srcFs = localFs; } else { @@ -859,14 +862,20 @@ public class FileLocalizer { dest.getParentFile().mkdirs(); dest.deleteOnExit(); - OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(dest)); - byte[] buffer = new byte[1024]; - int len; - while ((len=resourceStream.read(buffer)) > 0) { - outputStream.write(buffer,0,len); + OutputStream outputStream = null; + try { + outputStream = new BufferedOutputStream(new FileOutputStream(dest)); + byte[] buffer = new byte[1024]; + int len; + while ((len=resourceStream.read(buffer)) > 0) { + outputStream.write(buffer,0,len); + } + } finally { + resourceStream.close(); + if (outputStream != null) { + outputStream.close(); + } } - outputStream.close(); - localFileRet = new FetchFileRet(dest,false); } else Modified: pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java Thu Nov 27 12:49:54 2014 @@ -213,6 +213,9 @@ public class ReadToEndLoader extends Loa // input completely PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1, new ArrayList<OperatorKey>(), -1); + // Set the conf object so that if the wrappedLoadFunc uses it, + // it won't be null + pigSplit.setConf(conf); wrappedLoadFunc.prepareToRead(reader, pigSplit); return true; } Modified: pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java Thu Nov 27 12:49:54 2014 @@ -47,10 +47,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.impl.PigContext; import org.apache.tools.bzip2r.BZip2Constants; -import org.codehaus.jackson.annotate.JsonPropertyOrder; -import org.codehaus.jackson.map.annotate.JacksonStdImpl; import org.joda.time.DateTime; import com.google.common.collect.Multimaps; @@ -68,8 +67,6 @@ public class JarManager { AUTOMATON(Automaton.class), ANTLR(CommonTokenStream.class), GUAVA(Multimaps.class), - JACKSON_CORE(JsonPropertyOrder.class), - JACKSON_MAPPER(JacksonStdImpl.class), JODATIME(DateTime.class); private final Class pkgClass; @@ -92,9 +89,16 @@ public class JarManager { createPigScriptUDFJar(fos, pigContext, contents); if (!contents.isEmpty()) { - FileInputStream fis = new FileInputStream(scriptUDFJarFile); - String md5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(fis); - fis.close(); + FileInputStream fis = null; + String md5 = null; + try { + fis = new FileInputStream(scriptUDFJarFile); + md5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(fis); + } finally { + if (fis != null) { + fis.close(); + } + } File newScriptUDFJarFile = new File(scriptUDFJarFile.getParent(), "PigScriptUDF-" + md5 + ".jar"); scriptUDFJarFile.renameTo(newScriptUDFJarFile); return newScriptUDFJarFile; @@ -107,15 +111,20 @@ public class JarManager { for (String path: pigContext.scriptFiles) { log.debug("Adding entry " + path + " to job jar" ); InputStream stream = null; - if (new File(path).exists()) { - stream = new FileInputStream(new File(path)); + File inputFile = new File(path); + if (inputFile.exists()) { + stream = new FileInputStream(inputFile); } else { stream = PigContext.getClassLoader().getResourceAsStream(path); } if (stream==null) { throw new IOException("Cannot find " + path); } - addStream(jarOutputStream, path, stream, contents); + try { + addStream(jarOutputStream, path, stream, contents, inputFile.lastModified()); + } finally { + stream.close(); + } } for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) { log.debug("Adding entry " + entry.getKey() + " to job jar" ); @@ -128,7 +137,11 @@ public class JarManager { if (stream==null) { throw new IOException("Cannot find " + entry.getValue().getPath()); } - addStream(jarOutputStream, entry.getKey(), stream, contents); + try { + addStream(jarOutputStream, entry.getKey(), stream, contents, entry.getValue().lastModified()); + } finally { + stream.close(); + } } if (!contents.isEmpty()) { jarOutputStream.close(); @@ -139,7 +152,7 @@ public class JarManager { /** * Creates a Classloader based on the passed jarFile and any extra jar files. - * + * * @param jarFile * the jar file to be part of the newly created Classloader. This jar file plus any * jars in the extraJars list will constitute the classpath. @@ -161,7 +174,7 @@ public class JarManager { /** * Adds a stream to a Jar file. - * + * * @param os * the OutputStream of the Jar file to which the stream will be added. * @param name @@ -171,15 +184,20 @@ public class JarManager { * @param contents * the current contents of the Jar file. (We use this to avoid adding two streams * with the same name. + * @param timestamp + * timestamp of the entry * @throws IOException */ - private static void addStream(JarOutputStream os, String name, InputStream is, Map<String, String> contents) + private static void addStream(JarOutputStream os, String name, InputStream is, Map<String, String> contents, + long timestamp) throws IOException { if (contents.get(name) != null) { return; } contents.put(name, ""); - os.putNextEntry(new JarEntry(name)); + JarEntry entry = new JarEntry(name); + entry.setTime(timestamp); + os.putNextEntry(entry); byte buffer[] = new byte[4096]; int rc; while ((rc = is.read(buffer)) > 0) { @@ -190,6 +208,9 @@ public class JarManager { public static List<String> getDefaultJars() { List<String> defaultJars = new ArrayList<String>(); for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) { + if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) { + continue; //Skip + } String jar = findContainingJar(pkgToSend.getPkgClass()); if (!defaultJars.contains(jar)) { defaultJars.add(jar); @@ -201,7 +222,7 @@ public class JarManager { /** * Find a jar that contains a class of the same name, if any. It will return a jar file, even if * that is not the first thing on the class path that has a class with the same name. - * + * * @param my_class * the class to find * @return a jar file that contains the class, or null @@ -243,12 +264,12 @@ public class JarManager { } return null; } - + /** * Add the jars containing the given classes to the job's configuration * such that JobClient will ship them to the cluster and add them to * the DistributedCache - * + * * @param job * Job object * @param classes @@ -266,10 +287,10 @@ public class JarManager { return; conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0]))); } - + /** - * Add the qualified path name of jars containing the given classes - * + * Add the qualified path name of jars containing the given classes + * * @param fs * FileSystem object * @param jars Modified: pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java Thu Nov 27 12:49:54 2014 @@ -145,9 +145,9 @@ public class PropertiesUtil { properties.setProperty("stop.on.failure", ""+false); } - if (properties.getProperty(PigConfiguration.OPT_FETCH) == null) { + if (properties.getProperty(PigConfiguration.PIG_OPT_FETCH) == null) { //by default fetch optimization is on - properties.setProperty(PigConfiguration.OPT_FETCH, ""+true); + properties.setProperty(PigConfiguration.PIG_OPT_FETCH, ""+true); } } Modified: pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java Thu Nov 27 12:49:54 2014 @@ -43,38 +43,38 @@ import org.apache.commons.logging.LogFac * <p> * Low memory is defined as more than 50% of the tenured pool being allocated. Spillable objects are * tracked using WeakReferences so that the objects can be GCed even though this class has a reference - * to them. + * to them. * */ public class SpillableMemoryManager implements NotificationListener { - + private final Log log = LogFactory.getLog(getClass()); - + LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>(); - - // if we freed at least this much, invoke GC + + // if we freed at least this much, invoke GC // (default 40 MB - this can be overridden by user supplied property) private static long gcActivationSize = 40000000L ; - + // spill file size should be at least this much // (default 5MB - this can be overridden by user supplied property) private static long spillFileSizeThreshold = 5000000L ; - + // this will keep track of memory freed across spills // and between GC invocations private static long accumulatedFreeSize = 0L; - + // fraction of biggest heap for which we want to get // "memory usage threshold exceeded" notifications private static double memoryThresholdFraction = 0.7; - + // fraction of biggest heap for which we want to get // "collection threshold exceeded" notifications private static double collectionMemoryThresholdFraction = 0.5; - + // log notification on usage threshold exceeded only the first time private boolean firstUsageThreshExceededLogged = false; - + // log notification on collection threshold exceeded only the first time private boolean firstCollectionThreshExceededLogged = false; @@ -82,54 +82,52 @@ public class SpillableMemoryManager impl // if we want to perform an extra gc before the spill private static double extraGCThresholdFraction = 0.05; private static long extraGCSpillSizeThreshold = 0L; - + private static volatile SpillableMemoryManager manager; private SpillableMemoryManager() { ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null); List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans(); - MemoryPoolMXBean biggestHeap = null; - long biggestSize = 0; + MemoryPoolMXBean tenuredHeap = null; + long tenuredHeapSize = 0; long totalSize = 0; - for (MemoryPoolMXBean b: mpbeans) { - log.debug("Found heap (" + b.getName() + - ") of type " + b.getType()); - if (b.getType() == MemoryType.HEAP) { - /* Here we are making the leap of faith that the biggest - * heap is the tenured heap - */ - long size = b.getUsage().getMax(); + for (MemoryPoolMXBean pool : mpbeans) { + log.debug("Found heap (" + pool.getName() + ") of type " + pool.getType()); + if (pool.getType() == MemoryType.HEAP) { + long size = pool.getUsage().getMax(); totalSize += size; - if (size > biggestSize) { - biggestSize = size; - biggestHeap = b; + // CMS Old Gen or "tenured" is the only heap that supports + // setting usage threshold. + if (pool.isUsageThresholdSupported()) { + tenuredHeapSize = size; + tenuredHeap = pool; } } } extraGCSpillSizeThreshold = (long) (totalSize * extraGCThresholdFraction); - if (biggestHeap == null) { + if (tenuredHeap == null) { throw new RuntimeException("Couldn't find heap"); } log.debug("Selected heap to monitor (" + - biggestHeap.getName() + ")"); - - // we want to set both collection and usage threshold alerts to be + tenuredHeap.getName() + ")"); + + // we want to set both collection and usage threshold alerts to be // safe. In some local tests after a point only collection threshold // notifications were being sent though usage threshold notifications // were sent early on. So using both would ensure that // 1) we get notified early (though usage threshold exceeded notifications) // 2) we get notified always when threshold is exceeded (either usage or // collection) - + /* We set the threshold to be 50% of tenured since that is where * the GC starts to dominate CPU time according to Sun doc */ - biggestHeap.setCollectionUsageThreshold((long)(biggestSize * collectionMemoryThresholdFraction)); + tenuredHeap.setCollectionUsageThreshold((long)(tenuredHeapSize * collectionMemoryThresholdFraction)); // we set a higher threshold for usage threshold exceeded notification // since this is more likely to be effective sooner and we do not // want to be spilling too soon - biggestHeap.setUsageThreshold((long)(biggestSize * memoryThresholdFraction)); + tenuredHeap.setUsageThreshold((long)(tenuredHeapSize * memoryThresholdFraction)); } - + public static SpillableMemoryManager getInstance() { if (manager == null) { manager = new SpillableMemoryManager(); @@ -138,21 +136,21 @@ public class SpillableMemoryManager impl } public static void configure(Properties properties) { - + try { - + spillFileSizeThreshold = Long.parseLong( properties.getProperty("pig.spill.size.threshold") ) ; - + gcActivationSize = Long.parseLong( properties.getProperty("pig.spill.gc.activation.size") ) ; - } + } catch (NumberFormatException nfe) { throw new RuntimeException("Error while converting system configurations" + "spill.size.threshold, spill.gc.activation.size", nfe) ; } } - + @Override public void handleNotification(Notification n, Object o) { CompositeData cd = (CompositeData) n.getUserData(); @@ -166,7 +164,7 @@ public class SpillableMemoryManager impl toFree = info.getUsage().getUsed() - threshold + (long)(threshold * 0.5); //log - String msg = "memory handler call- Usage threshold " + String msg = "memory handler call- Usage threshold " + info.getUsage(); if(!firstUsageThreshExceededLogged){ log.info("first " + msg); @@ -177,7 +175,7 @@ public class SpillableMemoryManager impl } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE long threshold = (long)(info.getUsage().getMax() * collectionMemoryThresholdFraction); toFree = info.getUsage().getUsed() - threshold + (long)(threshold * 0.5); - + //log String msg = "memory handler call - Collection threshold " + info.getUsage(); @@ -191,7 +189,7 @@ public class SpillableMemoryManager impl } clearSpillables(); if (toFree < 0) { - log.debug("low memory handler returning " + + log.debug("low memory handler returning " + "because there is nothing to free"); return; } @@ -203,7 +201,7 @@ public class SpillableMemoryManager impl * becomes null, but it will be close enough. * Also between the time we sort and we use these spillables, they * may actually change in size - so this is just best effort - */ + */ @Override public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) { Spillable o1 = o1Ref.get(); @@ -219,7 +217,7 @@ public class SpillableMemoryManager impl } long o1Size = o1.getMemorySize(); long o2Size = o2.getMemorySize(); - + if (o1Size == o2Size) { return 0; } @@ -254,8 +252,10 @@ public class SpillableMemoryManager impl // we force GC to make sure we really need to keep this // object before paying for the expensive spill(). // Done at most once per handleNotification. + // Do not invoke extraGC for GroupingSpillable. Its size will always exceed + // extraGCSpillSizeThreshold and the data is always strong referenced. if( !extraGCCalled && extraGCSpillSizeThreshold != 0 - && toBeFreed > extraGCSpillSizeThreshold ) { + && toBeFreed > extraGCSpillSizeThreshold && !(s instanceof GroupingSpillable)) { log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()"); // this extra assignment to null is needed so that gc can free the // spillable if nothing else is pointing at it @@ -271,7 +271,7 @@ public class SpillableMemoryManager impl continue; } } - s.spill(); + s.spill(); numObjSpilled++; estimatedFreed += toBeFreed; accumulatedFreeSize += toBeFreed; @@ -280,13 +280,13 @@ public class SpillableMemoryManager impl if (accumulatedFreeSize > gcActivationSize) { invokeGC = true; } - + if (estimatedFreed > toFree) { log.debug("Freed enough space - getting out of memory handler"); invokeGC = true; break; } - } + } /* Poke the GC again to see if we successfully freed enough memory */ if(invokeGC) { System.gc(); @@ -301,7 +301,7 @@ public class SpillableMemoryManager impl } } - + public void clearSpillables() { synchronized (spillables) { // Walk the list first and remove nulls, otherwise the sort Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Thu Nov 27 12:49:54 2014 @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.Properties; import org.apache.hadoop.conf.Configuration; +import org.apache.pig.JVMReuseManager; +import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; public class UDFContext { @@ -41,6 +43,10 @@ public class UDFContext { } }; + static { + JVMReuseManager.getInstance().registerForStaticDataCleanup(UDFContext.class); + } + private UDFContext() { udfConfs = new HashMap<UDFContextKey, Properties>(); } @@ -62,7 +68,8 @@ public class UDFContext { /* * internal pig use only - should NOT be called from user code */ - public static void destroy() { + @StaticDataCleanup + public static void cleanupStaticData() { tss = new ThreadLocal<UDFContext>() { @Override public UDFContext initialValue() { Modified: pig/branches/spark/src/org/apache/pig/impl/util/Utils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/Utils.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/Utils.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/Utils.java Thu Nov 27 12:49:54 2014 @@ -23,10 +23,8 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.io.PrintStream; import java.io.SequenceInputStream; -import java.net.URL; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; @@ -42,12 +40,10 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.JobConf; @@ -65,7 +61,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.InterStorage; import org.apache.pig.impl.io.ReadToEndLoader; @@ -76,6 +71,7 @@ import org.apache.pig.impl.logicalLayer. import org.apache.pig.newplan.logical.relational.LogicalSchema; import org.apache.pig.parser.ParserException; import org.apache.pig.parser.QueryParserDriver; +import org.joda.time.DateTimeZone; import com.google.common.collect.Lists; import com.google.common.primitives.Longs; @@ -85,17 +81,32 @@ import com.google.common.primitives.Long */ public class Utils { private static final Log log = LogFactory.getLog(Utils.class); - + private static final Pattern JAVA_MAXHEAPSIZE_PATTERN = Pattern.compile("-Xmx(([0-9]+)[mMgG])"); + + /** * This method checks whether JVM vendor is IBM * @return true if IBM JVM is being used * false otherwise */ - public static boolean isVendorIBM() { + public static boolean isVendorIBM() { return System.getProperty("java.vendor").contains("IBM"); } - - + + public static boolean isHadoop23() { + String version = org.apache.hadoop.util.VersionInfo.getVersion(); + if (version.matches("\\b0\\.23\\..+\\b")) + return true; + return false; + } + + public static boolean isHadoop2() { + String version = org.apache.hadoop.util.VersionInfo.getVersion(); + if (version.matches("\\b2\\.\\d+\\..+")) + return true; + return false; + } + /** * This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)} * checks if two objects are equals - two levels of checks are @@ -238,7 +249,7 @@ public class Utils { } public static LogicalSchema parseSchema(String schemaString) throws ParserException { - QueryParserDriver queryParser = new QueryParserDriver( new PigContext(), + QueryParserDriver queryParser = new QueryParserDriver( new PigContext(), "util", new HashMap<String, String>() ) ; LogicalSchema schema = queryParser.parseSchema(schemaString); return schema; @@ -249,7 +260,7 @@ public class Utils { * field. This will be called only when PigStorage is invoked with * '-tagFile' or '-tagPath' option and the schema file is present to be * loaded. - * + * * @param schema * @param fieldName * @return ResourceSchema @@ -383,7 +394,7 @@ public class Utils { } else if (TEMPFILE_STORAGE.TFILE.lowerName().equals(tmpFileCompressionStorage)) { return TEMPFILE_STORAGE.TFILE; } else { - throw new IllegalArgumentException("Unsupported storage format " + tmpFileCompressionStorage + + throw new IllegalArgumentException("Unsupported storage format " + tmpFileCompressionStorage + ". Should be one of " + Arrays.toString(TEMPFILE_STORAGE.values())); } } @@ -582,7 +593,7 @@ public class Utils { // substitute eval = eval.substring(0, match.start())+val+eval.substring(match.end()); } - throw new IllegalStateException("Variable substitution depth too large: " + throw new IllegalStateException("Variable substitution depth too large: " + MAX_SUBST + " " + expr); } @@ -648,4 +659,35 @@ public class Utils { return null; } + + public static int extractHeapSizeInMB(String input) { + int ret = 0; + if(input == null || input.equals("")) + return ret; + Matcher m = JAVA_MAXHEAPSIZE_PATTERN.matcher(input); + String heapStr = null; + String heapNum = null; + // Grabs the last match which takes effect (in case that multiple Xmx options specified) + while (m.find()) { + heapStr = m.group(1); + heapNum = m.group(2); + } + if (heapStr != null) { + // when Xmx specified in Gigabyte + if(heapStr.endsWith("g") || heapStr.endsWith("G")) { + ret = Integer.parseInt(heapNum) * 1024; + } else { + ret = Integer.parseInt(heapNum); + } + } + return ret; + } + + public static void setDefaultTimeZone(Configuration conf) { + String dtzStr = conf.get(PigConfiguration.PIG_DATETIME_DEFAULT_TIMEZONE); + if (dtzStr != null && dtzStr.length() > 0) { + // don't use offsets because it breaks across DST/Standard Time + DateTimeZone.setDefault(DateTimeZone.forID(dtzStr)); + } + } } Modified: pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java Thu Nov 27 12:49:54 2014 @@ -27,7 +27,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; @@ -48,6 +50,7 @@ import org.apache.hadoop.hive.serde2.typ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.BytesWritable; +import org.apache.pig.PigWarning; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.backend.executionengine.ExecException; @@ -57,6 +60,7 @@ import org.apache.pig.data.DataByteArray import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.apache.pig.tools.pigstats.PigStatusReporter; import org.joda.time.DateTime; public class OrcUtils { @@ -92,7 +96,14 @@ public class OrcUtils { for (Map.Entry<Object, Object> entry : m.entrySet()) { Object convertedKey = convertOrcToPig(entry.getKey(), keyObjectInspector, null); Object convertedValue = convertOrcToPig(entry.getValue(), valueObjectInspector, null); - ((Map)result).put(convertedKey.toString(), convertedValue); + if (convertedKey!=null) { + ((Map)result).put(convertedKey.toString(), convertedValue); + } else { + PigStatusReporter reporter = PigStatusReporter.getInstance(); + if (reporter != null) { + reporter.incrCounter(PigWarning.UDF_WARNING_1, 1); + } + } } break; case LIST: @@ -125,6 +136,12 @@ public class OrcUtils { case STRING: result = poi.getPrimitiveJavaObject(obj); break; + case CHAR: + result = ((HiveChar)poi.getPrimitiveJavaObject(obj)).getValue(); + break; + case VARCHAR: + result = ((HiveVarchar)poi.getPrimitiveJavaObject(obj)).getValue(); + break; case BYTE: result = (int)(Byte)poi.getPrimitiveJavaObject(obj); break; @@ -222,6 +239,12 @@ public class OrcUtils { case STRING: fieldSchema.setType(DataType.CHARARRAY); break; + case VARCHAR: + fieldSchema.setType(DataType.CHARARRAY); + break; + case CHAR: + fieldSchema.setType(DataType.CHARARRAY); + break; case TIMESTAMP: fieldSchema.setType(DataType.DATETIME); break; Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Thu Nov 27 12:49:54 2014 @@ -512,7 +512,7 @@ public class ExpToPhyTranslationVisitor } List<String> cacheFiles = ((EvalFunc)f).getCacheFiles(); if (cacheFiles != null) { - ((POUserFunc)p).setCacheFiles(cacheFiles.toArray(new String[cacheFiles.size()])); + ((POUserFunc)p).setCacheFiles(cacheFiles); } } else { p = new POUserComparisonFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Thu Nov 27 12:49:54 2014 @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac import org.apache.pig.FuncSpec; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; +import org.apache.pig.StoreResources; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; @@ -142,6 +143,8 @@ public class LogToPhyTranslationVisitor load.setSignature(loLoad.getSignature()); load.setLimit(loLoad.getLimit()); load.setIsTmpLoad(loLoad.isTmpLoad()); + load.setCacheFiles(loLoad.getLoadFunc().getCacheFiles()); + load.setShipFiles(loLoad.getLoadFunc().getShipFiles()); currentPlan.add(load); logToPhyMap.put(loLoad, load); @@ -631,6 +634,7 @@ public class LogToPhyTranslationVisitor List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2); POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst ); + fe.setMapSideOnly(true); fe.addOriginalLocation(cross.getAlias(), cross.getLocation()); currentPlan.add(fe); currentPlan.connect(logToPhyMap.get(op), fe); @@ -953,8 +957,11 @@ public class LogToPhyTranslationVisitor store.setSortInfo(loStore.getSortInfo()); store.setIsTmpStore(loStore.isTmpStore()); store.setStoreFunc(loStore.getStoreFunc()); - store.setSchema(Util.translateSchema( loStore.getSchema() )); + if (loStore.getStoreFunc() instanceof StoreResources) { + store.setCacheFiles(((StoreResources)loStore.getStoreFunc()).getCacheFiles()); + store.setShipFiles(((StoreResources)loStore.getStoreFunc()).getShipFiles()); + } currentPlan.add(store);
