Modified: pig/branches/spark/src/org/apache/pig/builtin/StringMin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/StringMin.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/StringMin.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/StringMin.java Fri Mar 4 18:17:39 2016 @@ -46,14 +46,17 @@ public class StringMin extends EvalFunc< } } + @Override public String getInitial() { return Initial.class.getName(); } + @Override public String getIntermed() { return Intermediate.class.getName(); } + @Override public String getFinal() { return Final.class.getName(); } @@ -78,7 +81,7 @@ public class StringMin extends EvalFunc< } catch (Exception e) { int errCode = 2106; String msg = "Error while computing min in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -95,7 +98,7 @@ public class StringMin extends EvalFunc< } catch (Exception e) { int errCode = 2106; String msg = "Error while computing min in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } @@ -109,17 +112,17 @@ public class StringMin extends EvalFunc< } catch (Exception e) { int errCode = 2106; String msg = "Error while computing min in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } } } static protected String min(Tuple input) throws ExecException { DataBag values = (DataBag)input.get(0); - + // if we were handed an empty bag, return NULL // this is in compliance with SQL standard - if(values.size() == 0) { + if(values == null || values.size() == 0) { return null; } @@ -130,7 +133,7 @@ public class StringMin extends EvalFunc< Tuple t = it.next(); curMin = (String)(t.get(0)); } - + for (; it.hasNext();) { Tuple t = it.next(); try { @@ -139,25 +142,25 @@ public class StringMin extends EvalFunc< if( s.compareTo(curMin) < 0) { curMin = s; } - + } catch (RuntimeException exp) { int errCode = 2103; String msg = "Problem while computing min of strings."; throw new ExecException(msg, errCode, PigException.BUG, exp); } } - + return curMin; } @Override public Schema outputSchema(Schema input) { - return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); + return new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); } - + /* accumulator interface */ private String intermediateMin = null; - + @Override public void accumulate(Tuple b) throws IOException { try { @@ -166,16 +169,16 @@ public class StringMin extends EvalFunc< return; } // check if it lexicographically follows curMax - if (intermediateMin == null || intermediateMin.compareTo(curMin) < 0) { + if (intermediateMin == null || intermediateMin.compareTo(curMin) > 0) { intermediateMin = curMin; - } + } } catch (ExecException ee) { throw ee; } catch (Exception e) { int errCode = 2106; String msg = "Error while computing max in " + this.getClass().getSimpleName(); - throw new ExecException(msg, errCode, PigException.BUG, e); + throw new ExecException(msg, errCode, PigException.BUG, e); } }
Modified: pig/branches/spark/src/org/apache/pig/builtin/TOMAP.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TOMAP.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/TOMAP.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/TOMAP.java Fri Mar 4 18:17:39 2016 @@ -22,6 +22,7 @@ import java.util.Map; import java.util.HashMap; import org.apache.pig.EvalFunc; +import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -30,27 +31,53 @@ import org.apache.pig.impl.logicalLayer. * This class makes a map out of the parameters passed to it * T = foreach U generate TOMAP($0, $1, $2, $3); * It generates a map $0->1, $2->$3 + * + * This UDF also accepts a bag with 'pair' tuples (i.e. tuples with a 'key' and a 'value'). + * */ public class TOMAP extends EvalFunc<Map> { @Override public Map exec(Tuple input) throws IOException { - if (input == null || input.size() < 2) - return null; + if (input == null || input.size() == 0) { + return null; + } + + Map<String, Object> output = new HashMap<String, Object>(); + try { - Map<String, Object> output = new HashMap<String, Object>(); + // Is this a single bag with all the values? + if (input.size() == 1) { + if (input.get(0) instanceof DataBag) { + DataBag bagOfPairs = (DataBag)input.get(0); + if (bagOfPairs.size() == 0) { + return output; + } + + for (Tuple tuple: bagOfPairs) { + if (tuple.size() != 2) { + throw new RuntimeException("All input tuples in the bag MUST have exactly 2 fields"); + } + String key = (String)tuple.get(0); + Object val = tuple.get(1); + output.put(key, val); + } + return output; + } else { + return null; // If only 1 value then it must be a bag + } + } for (int i = 0; i < input.size(); i=i+2) { - String key = (String)input.get(i); - Object val = input.get(i+1); - output.put(key, val); + String key = (String)input.get(i); + Object val = input.get(i+1); + output.put(key, val); } - - return output; - } catch (ClassCastException e){ - throw new RuntimeException("Map key must be a String"); - } catch (ArrayIndexOutOfBoundsException e){ - throw new RuntimeException("Function input must have even number of parameters"); + return output; + } catch (ClassCastException e){ + throw new RuntimeException("Map key must be a String"); + } catch (ArrayIndexOutOfBoundsException e){ + throw new RuntimeException("Function input must have even number of parameters"); } catch (Exception e) { throw new RuntimeException("Error while creating a map", e); } @@ -58,7 +85,34 @@ public class TOMAP extends EvalFunc<Map> @Override public Schema outputSchema(Schema input) { - return new Schema(new Schema.FieldSchema(null, DataType.MAP)); + Byte valueType = null; + if (input.size() == 1) { + // If input is bag with 'pair' tuples + Schema bagSchema = input.getFields().get(0).schema; + if (bagSchema != null && bagSchema.size() == 1) { + Schema tupleSchema = bagSchema.getFields().get(0).schema; + if (tupleSchema != null) { + valueType = tupleSchema.getFields().get(1).type; + } + } + } else if (input != null && input.getFields()!=null) { + for (int i=0;i<input.size();i++) { + if (i % 2 == 1) { + if (valueType == null) { + valueType = input.getFields().get(i).type; + } else if (valueType != input.getFields().get(i).type) { + valueType = DataType.BYTEARRAY; + break; + } + } + } + } + Schema s = new Schema(new Schema.FieldSchema(null, DataType.MAP)); + if (valueType != null && valueType != DataType.BYTEARRAY) { + s.getFields().get(0).schema = new Schema(new Schema.FieldSchema(null, valueType)); + return s; + } + return s; } @Override Modified: pig/branches/spark/src/org/apache/pig/builtin/TOP.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TOP.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/TOP.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/TOP.java Fri Mar 4 18:17:39 2016 @@ -27,6 +27,7 @@ import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.AccumulatorEvalFunc; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; @@ -68,7 +69,7 @@ import org.apache.pig.impl.logicalLayer. * GENERATE FLATTEN(result); * * } */ -public class TOP extends EvalFunc<DataBag> implements Algebraic{ +public class TOP extends AccumulatorEvalFunc<DataBag> implements Algebraic { private static final Log log = LogFactory.getLog(TOP.class); private static BagFactory mBagFactory = BagFactory.getInstance(); private static TupleFactory mTupleFactory = TupleFactory.getInstance(); @@ -139,35 +140,27 @@ public class TOP extends EvalFunc<DataBa } } + // for Accumulator interface + private PriorityQueue<Tuple> store = null; + @Override - public DataBag exec(Tuple tuple) throws IOException { + public void accumulate(Tuple tuple) throws IOException { if (tuple == null || tuple.size() < 3) { - return null; + return; } try { int n = (Integer) tuple.get(0); int fieldNum = (Integer) tuple.get(1); DataBag inputBag = (DataBag) tuple.get(2); if (inputBag == null) { - return null; + return; } - PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1, - new TupleComparator(fieldNum, sortDesc)); - updateTop(store, n, inputBag); - DataBag outputBag = mBagFactory.newDefaultBag(); - for (Tuple t : store) { - outputBag.add(t); - } - if (log.isDebugEnabled()) { - if (randomizer.nextInt(1000) == 1) { - log.debug("outputting a bag: "); - for (Tuple t : outputBag) - log.debug("outputting "+t.toDelimitedString("\t")); - log.debug("=================="); - } + if (store == null) { + store = new PriorityQueue<Tuple>(n + 1, new TupleComparator(fieldNum, sortDesc)); } - return outputBag; + + updateTop(store, n, inputBag); } catch (ExecException e) { throw new RuntimeException("ExecException executing function: ", e); } catch (Exception e) { @@ -175,6 +168,40 @@ public class TOP extends EvalFunc<DataBa } } + @Override + public DataBag getValue() { + if (store == null) { + return null; + } + + DataBag outputBag = mBagFactory.newDefaultBag(); + + for (Tuple t : store) { + outputBag.add(t); + } + + if (log.isDebugEnabled()) { + if (randomizer.nextInt(1000) == 1) { + log.debug("outputting a bag: "); + try { + for (Tuple t : outputBag) { + log.debug("outputting "+t.toDelimitedString("\t")); + } + } catch (ExecException e) { + throw new RuntimeException("ExecException executing function: ", e); + } + log.debug("=================="); + } + } + + return outputBag; + } + + @Override + public void cleanup() { + store = null; + } + protected static void updateTop(PriorityQueue<Tuple> store, int limit, DataBag inputBag) { Iterator<Tuple> itr = inputBag.iterator(); while (itr.hasNext()) { Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Fri Mar 4 18:17:39 2016 @@ -22,6 +22,8 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -29,11 +31,13 @@ import org.apache.hadoop.mapreduce.Recor import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -51,6 +55,12 @@ public class TextLoader extends LoadFunc protected RecordReader in = null; private TupleFactory mTupleFactory = TupleFactory.getInstance(); private String loadLocation; + protected final Log mLog = LogFactory.getLog(getClass()); + + // it determines whether to depend on pig's own Bzip2TextInputFormat or + // to simply depend on hadoop for handling bzip2 inputs + private boolean bzipinput_usehadoops ; + @Override public Tuple getNext() throws IOException { @@ -248,9 +258,13 @@ public class TextLoader extends LoadFunc @Override public InputFormat getInputFormat() { - if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) { + if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) + && !HadoopShims.isHadoopYARN() + && !bzipinput_usehadoops ) { + mLog.info("Using Bzip2TextInputFormat"); return new Bzip2TextInputFormat(); } else { + mLog.info("Using PigTextInputFormat"); return new PigTextInputFormat(); } } @@ -269,5 +283,8 @@ public class TextLoader extends LoadFunc public void setLocation(String location, Job job) throws IOException { loadLocation = location; FileInputFormat.setInputPaths(job, location); + bzipinput_usehadoops = job.getConfiguration().getBoolean( + PigConfiguration.PIG_BZIP_USE_HADOOP_INPUTFORMAT, + true ); } } Modified: pig/branches/spark/src/org/apache/pig/builtin/ToDate.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/ToDate.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/ToDate.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/ToDate.java Fri Mar 4 18:17:39 2016 @@ -117,15 +117,45 @@ public class ToDate extends EvalFunc<Dat } public static DateTimeZone extractDateTimeZone(String dtStr) { - return isoDateTimeFormatter.parseDateTime(dtStr).getZone(); + return isoDateTimeFormatter.parseDateTime(allowIso8601Space(dtStr)).getZone(); } public static DateTime extractDateTime(String dtStr) { - return isoDateTimeFormatter.parseDateTime(dtStr); + return isoDateTimeFormatter.parseDateTime(allowIso8601Space(dtStr)); + } + + /* + * ISO-8601 format and JDBC timestamp format are similar but not the same. + * + * Strict ISO-8601 specifies a 'T' between the date portion and + * the time portion: + * 2015-05-29T10:41:30.123 + * + * ISO-8601 allows a space instead of a 'T' as a looser variant. + * This variant is often adopted because it increases human readability. + * The JDBC timestamp format uses the ' ' space variant. + * 2015-05-29 10:41:30.123 + * + * Hive & Impala are database-oriented and generate JDBC timestamps + * with a ' ' space. + * + * We would like to accept both 'T' and ' ' space formats. + * + * org.joda.time.format.ISODateTimeFormatter requires the 'T'. + * The cleanest way to get joda-time to accept both is to convert + * the ' ' space to a a 'T' before feeding the string to the + * ISODateTimeFormatter. + */ + private static String allowIso8601Space(String dtStr) { + if (dtStr == null || dtStr.length() <= 10 || dtStr.charAt(10) != ' ') { + return dtStr; + } + return dtStr.substring(0, 10) + 'T' + dtStr.substring(11); } @Override public boolean allowCompileTimeCalculation() { return true; } + } Modified: pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java Fri Mar 4 18:17:39 2016 @@ -321,6 +321,7 @@ public class Utf8StorageConverter implem break; case DataType.BIGDECIMAL: field = bytesToBigDecimal(b); + break; case DataType.DATETIME: field = bytesToDateTime(b); break; Modified: pig/branches/spark/src/org/apache/pig/builtin/VALUELIST.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/VALUELIST.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/VALUELIST.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/VALUELIST.java Fri Mar 4 18:17:39 2016 @@ -105,7 +105,7 @@ public class VALUELIST extends EvalFunc< throw new RuntimeException(fe); } if(fs != null) { - innerFieldSchema = new Schema.FieldSchema(null, fs.type); + innerFieldSchema = new Schema.FieldSchema(null, new Schema(fs)); } } else { innerFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY); Modified: pig/branches/spark/src/org/apache/pig/builtin/VALUESET.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/VALUESET.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/VALUESET.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/VALUESET.java Fri Mar 4 18:17:39 2016 @@ -118,7 +118,7 @@ public class VALUESET extends EvalFunc<D throw new RuntimeException(fe); } if (fs != null) { - innerFieldSchema = new Schema.FieldSchema(null, fs.type); + innerFieldSchema = new Schema.FieldSchema(null, new Schema(fs)); } } else { innerFieldSchema = new Schema.FieldSchema(null, DataType.BYTEARRAY); Modified: pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java Fri Mar 4 18:17:39 2016 @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.TreeMap; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; @@ -74,7 +75,9 @@ import org.apache.pig.parser.ParserExcep * data.set("foo", * tuple("a"), * tuple("b"), - * tuple("c") + * tuple("c"), + * tuple(map("d","e", "f","g")), + * tuple(bag(tuple("h"),tuple("i"))) * ); * * pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();"); @@ -85,6 +88,8 @@ import org.apache.pig.parser.ParserExcep * assertEquals(tuple("a"), out.get(0)); * assertEquals(tuple("b"), out.get(1)); * assertEquals(tuple("c"), out.get(2)); + * assertEquals(tuple(map("f", "g", "d", "e" )), out.get(3)); + * assertEquals(tuple(bag(tuple("h"),tuple("i"))), out.get(4)); * </pre> * With Schema: * <pre> @@ -102,7 +107,7 @@ import org.apache.pig.parser.ParserExcep * pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();"); * * assertEquals(schema("a:chararray,b:chararray"), data.getSchema("bar")); - * + * * List<Tuple> out = data.get("bar"); * assertEquals(tuple("a", "a"), out.get(0)); * assertEquals(tuple("b", "b"), out.get(1)); @@ -132,7 +137,37 @@ public class Storage extends LoadFunc im public static DataBag bag(Tuple... tuples) { return new NonSpillableDataBag(Arrays.asList(tuples)); } - + + /** + * @param input These params are alternating "key", "value". So the number of params MUST be even !! + * Implementation is very similar to the TOMAP UDF. + * So map("A", B, "C", D) generates a map "A"->B, "C"->D + * @return a map containing the provided objects + */ + public static Map<String, Object> map(Object... input) { + if (input == null || input.length < 2) { + return null; + } + + try { + Map<String, Object> output = new HashMap<String, Object>(); + + for (int i = 0; i < input.length; i=i+2) { + String key = (String)input[i]; + Object val = input[i+1]; + output.put(key, val); + } + + return output; + } catch (ClassCastException e){ + throw new IllegalArgumentException("Map key must be a String"); + } catch (ArrayIndexOutOfBoundsException e){ + throw new IllegalArgumentException("Function input must have even number of parameters"); + } catch (Exception e) { + throw new RuntimeException("Error while creating a map", e); + } + } + /** * @param schema * @return the schema represented by the string @@ -193,7 +228,8 @@ public class Storage extends LoadFunc im private static class Parts { final String location; - final Map<String, Collection<Tuple>> parts = new HashMap<String, Collection<Tuple>>(); + // TreeMap to read part files in order + final Map<String, Collection<Tuple>> parts = new TreeMap<String, Collection<Tuple>>(); public Parts(String location) { super(); @@ -216,7 +252,7 @@ public class Storage extends LoadFunc im } } - + /** * An isolated data store to avoid side effects * @@ -249,7 +285,7 @@ public class Storage extends LoadFunc im public void set(String location, String schema, Tuple... data) throws ParserException { set(location, Utils.getSchemaFromString(schema), Arrays.asList(data)); } - + /** * to set the data in a location with a known schema * @@ -316,7 +352,7 @@ public class Storage extends LoadFunc im public void set(String location, Tuple... data) { set(location, Arrays.asList(data)); } - + /** * * @param location @@ -330,7 +366,7 @@ public class Storage extends LoadFunc im } /** - * + * * @param location * @return the schema stored in this location */ @@ -352,7 +388,7 @@ public class Storage extends LoadFunc im private String location; private Data data; - + private Schema schema; private Iterator<Tuple> dataBeingRead; @@ -403,9 +439,9 @@ private MockRecordWriter mockRecordWrite public void setUDFContextSignature(String signature) { super.setUDFContextSignature(signature); } - + // LoadMetaData - + @Override public ResourceSchema getSchema(String location, Job job) throws IOException { init(location, job); @@ -477,7 +513,7 @@ private MockRecordWriter mockRecordWrite } // StoreMetaData - + @Override public void storeStatistics(ResourceStatistics stats, String location, Job job) throws IOException { @@ -490,7 +526,7 @@ private MockRecordWriter mockRecordWrite init(location, job); data.setSchema(location, Schema.getPigSchema(schema)); } - + // Mocks for LoadFunc private static class MockRecordReader extends RecordReader<Object, Object> { Modified: pig/branches/spark/src/org/apache/pig/data/BinInterSedes.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/BinInterSedes.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/BinInterSedes.java (original) +++ pig/branches/spark/src/org/apache/pig/data/BinInterSedes.java Fri Mar 4 18:17:39 2016 @@ -1072,7 +1072,7 @@ public class BinInterSedes implements In // we have a compound tuple key (main_key, secondary_key). Each key has its own sort order, so // we have to deal with them separately. We delegate it to the first invocation of // compareDatum() - assert (tsz1 == 3); // main_key, secondary_key, value + assert (tsz1 == 2); // main_key, secondary_key result = compareDatum(t1.get(0), t2.get(0), mAsc); if (result == 0) result = compareDatum(t1.get(1), t2.get(1), mSecondaryAsc); Modified: pig/branches/spark/src/org/apache/pig/data/DataReaderWriter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DataReaderWriter.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DataReaderWriter.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DataReaderWriter.java Fri Mar 4 18:17:39 2016 @@ -174,10 +174,14 @@ public class DataReaderWriter { return Double.valueOf(in.readDouble()); case DataType.BIGINTEGER: - return new BigInteger(((DataByteArray)readDatum(in, in.readByte())).get()); + byte[] bigIntegerByteArray = new byte[in.readInt()]; + in.readFully(bigIntegerByteArray); + return new BigInteger(bigIntegerByteArray); case DataType.BIGDECIMAL: - return new BigDecimal((String)readDatum(in, in.readByte())); + byte[] bt = new byte[in.readInt()]; + in.readFully(bt); + return new BigDecimal(new String(bt, DataReaderWriter.UTF8)); case DataType.BOOLEAN: return Boolean.valueOf(in.readBoolean()); @@ -315,12 +319,16 @@ public class DataReaderWriter { case DataType.BIGINTEGER: out.writeByte(DataType.BIGINTEGER); - writeDatum(out, ((BigInteger)val).toByteArray()); + byte[] bytes = ((BigInteger)val).toByteArray(); + out.writeInt(bytes.length); + out.write(bytes); break; case DataType.BIGDECIMAL: out.writeByte(DataType.BIGDECIMAL); - writeDatum(out, ((BigDecimal)val).toString()); + byte[] bt = ((BigDecimal)val).toString().getBytes(DataReaderWriter.UTF8); + out.writeInt(bt.length); + out.write(bt); break; case DataType.CHARARRAY: { Modified: pig/branches/spark/src/org/apache/pig/data/DataType.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DataType.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DataType.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DataType.java Fri Mar 4 18:17:39 2016 @@ -1126,7 +1126,7 @@ public class DataType { case UNKNOWN: default: int errCode = 1071; - String msg = "Cannot convert a " + findTypeName(o) + " to a Boolean"; + String msg = "Cannot convert a " + findTypeName(o) + " to a DateTime"; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ClassCastException cce) { @@ -1135,11 +1135,11 @@ public class DataType { throw ee; } catch (NumberFormatException nfe) { int errCode = 1074; - String msg = "Problem with formatting. Could not convert " + o + " to Float."; + String msg = "Problem with formatting. Could not convert " + o + " to DateTime."; throw new ExecException(msg, errCode, PigException.INPUT, nfe); } catch (Exception e) { int errCode = 2054; - String msg = "Internal error. Could not convert " + o + " to Float."; + String msg = "Internal error. Could not convert " + o + " to DateTime."; throw new ExecException(msg, errCode, PigException.BUG); } } Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTuple.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTuple.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SchemaTuple.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SchemaTuple.java Fri Mar 4 18:17:39 2016 @@ -26,6 +26,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -37,7 +39,6 @@ import org.apache.pig.data.utils.MethodH import org.apache.pig.data.utils.SedesHelper; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.util.ObjectSerializer; -import org.mortbay.log.Log; import com.google.common.collect.Lists; @@ -52,6 +53,7 @@ import com.google.common.collect.Lists; @InterfaceAudience.Public @InterfaceStability.Unstable public abstract class SchemaTuple<T extends SchemaTuple<T>> extends AbstractTuple implements TypeAwareTuple { + private static final Log LOG = LogFactory.getLog(SchemaTuple.class); private static final long serialVersionUID = 1L; private static final int ONE_MINUTE = 60000; @@ -924,7 +926,7 @@ public abstract class SchemaTuple<T exte protected static Schema staticSchemaGen(String s) { try { if (s.equals("")) { - Log.warn("No Schema present in SchemaTuple generated class"); + LOG.warn("No Schema present in SchemaTuple generated class"); return new Schema(); } return (Schema) ObjectSerializer.deserialize(s); Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Fri Mar 4 18:17:39 2016 @@ -57,9 +57,7 @@ import org.apache.log4j.Level; import org.apache.pig.ExecType; import org.apache.pig.ExecTypeProvider; import org.apache.pig.FuncSpec; -import org.apache.pig.JVMReuseManager; import org.apache.pig.PigException; -import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.datastorage.DataStorageException; import org.apache.pig.backend.datastorage.ElementDescriptor; @@ -177,11 +175,7 @@ public class PigContext implements Seria // List of paths skipped for automatic shipping List<String> skippedShipPaths = new ArrayList<String>(); - static { - JVMReuseManager.getInstance().registerForStaticDataCleanup(PigContext.class); - } - - @StaticDataCleanup + //@StaticDataCleanup public static void staticDataCleanup() { packageImportList.set(null); } Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java Fri Mar 4 18:17:39 2016 @@ -18,8 +18,11 @@ package org.apache.pig.impl.builtin; import java.io.IOException; +import java.text.MessageFormat; import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.pig.EvalFunc; import org.apache.pig.backend.executionengine.ExecException; @@ -32,6 +35,9 @@ import org.apache.pig.impl.util.UDFConte public class GFCross extends EvalFunc<DataBag> { + + private static final Log LOG = LogFactory.getLog(GFCross.class); + private int numInputs, myNumber, numGroupsPerInput, numGroupsGoingTo; private BagFactory mBagFactory = BagFactory.getInstance(); private TupleFactory mTupleFactory = TupleFactory.getInstance(); @@ -70,6 +76,12 @@ public class GFCross extends EvalFunc<Da numGroupsPerInput = (int) Math.ceil(Math.pow(parallelism, 1.0/numInputs)); numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1); + + LOG.info(MessageFormat.format("Parallelism = {0}, numInputs = {1}, myNumber = {2}," + + " numGroupsPerInput = {3}, numGroupsGoingTo = {4}", + parallelism, numInputs, myNumber, + numGroupsPerInput, numGroupsGoingTo)); + } DataBag output = mBagFactory.newDefaultBag(); Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GetMemNumRows.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GetMemNumRows.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/GetMemNumRows.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/GetMemNumRows.java Fri Mar 4 18:17:39 2016 @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.reflect.Type; import org.apache.pig.EvalFunc; +import org.apache.pig.data.SizeUtil; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; @@ -56,6 +57,7 @@ public class GetMemNumRows extends EvalF int tSize = in.size(); if(tSize >=2 && PoissonSampleLoader.NUMROWS_TUPLE_MARKER.equals(in.get(tSize-2)) ){ + memSize -= SizeUtil.getPigObjMemSize(PoissonSampleLoader.NUMROWS_TUPLE_MARKER); numRows = (Long)in.get(tSize-1); } Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Fri Mar 4 18:17:39 2016 @@ -64,6 +64,9 @@ public class PoissonSampleLoader extends private int sampleRate = DEFAULT_SAMPLE_RATE; + // total memory in bytes + private long totalMemory; + private double heapPerc = PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE; // new Sample tuple @@ -89,7 +92,8 @@ public class PoissonSampleLoader extends if(t == null) { return createNumRowTuple(null); } - long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc); + long availRedMem = (long) ( totalMemory * heapPerc); + // availRedMem = 155084396; memToSkipPerSample = availRedMem/sampleRate; updateSkipInterval(t); @@ -175,6 +179,10 @@ public class PoissonSampleLoader extends sampleRate = conf.getInt(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, DEFAULT_SAMPLE_RATE); heapPerc = conf.getFloat(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE, PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE); + totalMemory = conf.getLong(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM, -1L); + if (totalMemory == -1) { + totalMemory = Runtime.getRuntime().maxMemory(); + } } } Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/ReadScalars.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/ReadScalars.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/ReadScalars.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/ReadScalars.java Fri Mar 4 18:17:39 2016 @@ -74,7 +74,8 @@ public class ReadScalars extends EvalFun valueLoaded = true; return null; } else if (inputBag.size() > 1) { - String msg = "Scalar has more than one row in the output."; + String msg = "Scalar has more than one row in the output." + +" (common cause: \"JOIN\" then \"FOREACH ... GENERATE foo.bar\" should be \"foo::bar\" )"; throw new ExecException(msg); } Tuple t1 = inputBag.iterator().next(); @@ -111,7 +112,8 @@ public class ReadScalars extends EvalFun Tuple t2 = loader.getNext(); if(t2 != null){ String msg = "Scalar has more than one row in the output. " - + "1st : " + t1 + ", 2nd :" + t2; + + "1st : " + t1 + ", 2nd :" + t2 + +" (common cause: \"JOIN\" then \"FOREACH ... GENERATE foo.bar\" should be \"foo::bar\" )"; throw new ExecException(msg); } valueLoaded = true; Modified: pig/branches/spark/src/org/apache/pig/impl/io/NullablePartitionWritable.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/io/NullablePartitionWritable.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/io/NullablePartitionWritable.java Fri Mar 4 18:17:39 2016 @@ -70,9 +70,9 @@ public class NullablePartitionWritable e @Override public void readFields(DataInput in) throws IOException { - String c = in.readUTF(); + byte type = in.readByte(); try { - key = HDataType.getWritableComparable(c); + key = HDataType.getNewWritableComparable(type); } catch(Exception e) { throw new IOException(e); } @@ -81,7 +81,7 @@ public class NullablePartitionWritable e @Override public void write(DataOutput out) throws IOException { - out.writeUTF(key.getClass().getName()); + out.writeByte(HDataType.findTypeFromClassName(key.getClass().getName())); key.write(out); } Modified: pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java Fri Mar 4 18:17:39 2016 @@ -196,13 +196,15 @@ public class ReadToEndLoader extends Loa private boolean initializeReader() throws IOException, InterruptedException { + // Close the previous reader first + if(reader != null) { + reader.close(); + reader = null; + } if(curSplitIndex > inpSplits.size() - 1) { // past the last split, we are done return false; } - if(reader != null){ - reader.close(); - } InputSplit curSplit = inpSplits.get(curSplitIndex); TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf, new TaskAttemptID()); Modified: pig/branches/spark/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/logicalLayer/schema/SchemaUtil.java Fri Mar 4 18:17:39 2016 @@ -263,7 +263,7 @@ public class SchemaUtil { throw new FrontendException( "Currently pig do not support this kind of type using Schema:" + DataType.findTypeName(type) - + ". You can write shema by yourself."); + + ". You can write schema by yourself."); } } @@ -271,11 +271,10 @@ public class SchemaUtil { private static void checkParameters(List<String> names, List<Byte> dataTypes) throws FrontendException { - // TODO Auto-generated method stub checkDataTypes(dataTypes); if (names.size() != dataTypes.size()) { throw new FrontendException( - "The number of names is not equal to the number of dataTypes"); + "The number of names (" + names.size() + ") is not equal to the number of dataTypes (" + dataTypes.size() + ")"); } } Modified: pig/branches/spark/src/org/apache/pig/impl/plan/OperatorKey.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/OperatorKey.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/plan/OperatorKey.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/plan/OperatorKey.java Fri Mar 4 18:17:39 2016 @@ -89,5 +89,10 @@ public class OperatorKey implements Seri NodeIdGenerator.getGenerator().getNextNodeId(scope)); } + static public OperatorKey fromString(String op) { + String scope = op.substring(0, op.indexOf("-")); + long id = Long.parseLong(op.substring(op.indexOf("-")+1)); + return new OperatorKey(scope, id); + } } Modified: pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java Fri Mar 4 18:17:39 2016 @@ -153,9 +153,15 @@ public abstract class OutputHandler { recordDelimLength = recordDelimBa.length - 1; //Ignore trailing \n recordDelimStr = new String(recordDelimBa, 0, recordDelimLength, Charsets.UTF_8); } - if (recordDelimLength == 0 || currValue.getLength() < recordDelimLength) { + + if (recordDelimLength == 0) { return true; } + //If our current section is less than the delim length, then its not the end of the row. + if (currValue.getLength() < recordDelimLength) { + return false; + } + return currValue.find(recordDelimStr, currValue.getLength() - recordDelimLength) >= 0; } Modified: pig/branches/spark/src/org/apache/pig/impl/streaming/PigStreamingUDF.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/PigStreamingUDF.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/streaming/PigStreamingUDF.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/streaming/PigStreamingUDF.java Fri Mar 4 18:17:39 2016 @@ -202,7 +202,7 @@ public class PigStreamingUDF extends Pig if (StreamingDelimiters.isDelimiter(DELIMS.getFieldDelim(), buf, index, depth, endIndex)) { val = extractString(buf, fieldStart, index - 1, true); - map.put(key, val); + if (key != null) map.put(key, val); fieldStart = index + 3; } } Modified: pig/branches/spark/src/org/apache/pig/impl/util/CompilerUtils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/CompilerUtils.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/CompilerUtils.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/CompilerUtils.java Fri Mar 4 18:17:39 2016 @@ -21,10 +21,11 @@ package org.apache.pig.impl.util; import java.util.ArrayList; import java.util.List; - import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; @@ -41,30 +42,43 @@ import org.apache.pig.impl.plan.NodeIdGe import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanException; -/* +/* * A class to add util functions that gets used by LogToPhyTranslator and MRCompiler - * + * */ public class CompilerUtils { - public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema) throws PlanException { + public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema, + boolean skewedRightOuterJoin, String isFirstReduceOfKeyClassName) throws PlanException { // we currently have POProject[bag] as the only operator in the plan // If the bag is an empty bag, we should replace // it with a bag with one tuple with null fields so that when we flatten // we do not drop records (flatten will drop records if the bag is left - // as an empty bag) and actually project nulls for the fields in + // as an empty bag) and actually project nulls for the fields in // the empty bag - + // So we need to get to the following state: // POProject[Bag] - // \ - // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields) - // \ | POProject[Bag] + // \ + // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields) + // \ | POProject[Bag] + // \ | / + // POBinCond + // Further, if it is skewed right outer join, only the first reduce of the key + // will generate tuple with null fields (See PIG-4377) + // + // POProject[key] POProject[Bag] + // \ / + // IsFirstReduceOfKey POUserFunc["IsEmpty()"] + // \ / + // \ / + // AND Const[Bag](bag with null fields) + // \ | POProject[Bag] // \ | / // POBinCond POProject relationProject = (POProject) fePlan.getRoots().get(0); try { - + // condition of the bincond POProject relationProjectForIsEmpty = relationProject.clone(); fePlan.add(relationProjectForIsEmpty); @@ -76,7 +90,36 @@ public class CompilerUtils { isEmpty.setResultType(DataType.BOOLEAN); fePlan.add(isEmpty); fePlan.connect(relationProjectForIsEmpty, isEmpty); - + + ExpressionOperator cond; + if (skewedRightOuterJoin) { + POProject projectForKey = new POProject(new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope))); + projectForKey.setColumn(0); + projectForKey.setOverloaded(false); + projectForKey.setResultType(inputSchema.getField(0).type); + + POAnd and = new POAnd(new OperatorKey(scope, NodeIdGenerator.getGenerator(). + getNextNodeId(scope))); + FuncSpec isFirstReduceOfKeySpec = new FuncSpec(isFirstReduceOfKeyClassName); + Object f1 = PigContext.instantiateFuncFromSpec(isFirstReduceOfKeySpec); + POUserFunc isFirstReduceOfKey = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator(). + getNextNodeId(scope)), -1, null, isFirstReduceOfKeySpec, (EvalFunc) f1); + + fePlan.add(projectForKey); + fePlan.add(isFirstReduceOfKey); + fePlan.add(and); + + fePlan.connect(projectForKey, isFirstReduceOfKey); + fePlan.connect(isFirstReduceOfKey, and); + fePlan.connect(isEmpty, and); + and.setLhs(isFirstReduceOfKey); + and.setRhs(isEmpty); + + cond = and; + } else { + cond = isEmpty; + } + // lhs of bincond (const bag with null fields) ConstantExpression ce = new ConstantExpression(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope))); @@ -93,25 +136,25 @@ public class CompilerUtils { ce.setResultType(DataType.BAG); //this operator doesn't have any predecessors fePlan.add(ce); - + //rhs of bincond is the original project // let's set up the bincond now POBinCond bincond = new POBinCond(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope))); - bincond.setCond(isEmpty); + bincond.setCond(cond); bincond.setLhs(ce); bincond.setRhs(relationProject); bincond.setResultType(DataType.BAG); fePlan.add(bincond); - fePlan.connect(isEmpty, bincond); + fePlan.connect(cond, bincond); fePlan.connect(ce, bincond); fePlan.connect(relationProject, bincond); } catch (Exception e) { throw new PlanException("Error setting up outerjoin", e); } - + } } Modified: pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java Fri Mar 4 18:17:39 2016 @@ -22,6 +22,7 @@ import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; +import java.util.Map.Entry; import java.util.Properties; import org.apache.commons.logging.Log; @@ -45,7 +46,7 @@ public class PropertiesUtil { loadPropertiesFromClasspath(properties, DEFAULT_PROPERTIES_FILE); loadPropertiesFromClasspath(properties, PROPERTIES_FILE); setDefaultsIfUnset(properties); - + //Now set these as system properties only if they are not already defined. if (log.isDebugEnabled()) { for (Object o: properties.keySet()) { @@ -61,7 +62,13 @@ public class PropertiesUtil { // Add System properties which include command line overrides // Any existing keys will be overridden - properties.putAll(System.getProperties()); + for (Entry<Object, Object> entry : System.getProperties().entrySet()) { + String key = (String) entry.getKey(); + if (key.startsWith("sun.") || key.startsWith("java.")) { + continue; + } + properties.put(key, entry.getValue()); + } // For telling error fast when there are problems ConfigurationValidator.validatePigProperties(properties) ; @@ -150,7 +157,7 @@ public class PropertiesUtil { properties.setProperty(PigConfiguration.PIG_OPT_FETCH, ""+true); } } - + /** * Loads default properties. * @return default properties Modified: pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java Fri Mar 4 18:17:39 2016 @@ -48,9 +48,13 @@ import org.apache.commons.logging.LogFac */ public class SpillableMemoryManager implements NotificationListener { - private final Log log = LogFactory.getLog(getClass()); + private static final Log log = LogFactory.getLog(SpillableMemoryManager.class); - LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>(); + private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>(); + // References to spillables with size + private LinkedList<SpillablePtr> spillablesSR = null; + + private Object spillLock = new Object(); // if we freed at least this much, invoke GC // (default 40 MB - this can be overridden by user supplied property) @@ -62,15 +66,15 @@ public class SpillableMemoryManager impl // this will keep track of memory freed across spills // and between GC invocations - private static long accumulatedFreeSize = 0L; + private long accumulatedFreeSize = 0L; // fraction of biggest heap for which we want to get // "memory usage threshold exceeded" notifications - private static double memoryThresholdFraction = 0.7; + private double memoryThresholdFraction = 0.7; // fraction of biggest heap for which we want to get // "collection threshold exceeded" notifications - private static double collectionMemoryThresholdFraction = 0.5; + private double collectionMemoryThresholdFraction = 0.5; // log notification on usage threshold exceeded only the first time private boolean firstUsageThreshExceededLogged = false; @@ -80,10 +84,18 @@ public class SpillableMemoryManager impl // fraction of the total heap used for the threshold to determine // if we want to perform an extra gc before the spill - private static double extraGCThresholdFraction = 0.05; - private static long extraGCSpillSizeThreshold = 0L; + private double extraGCThresholdFraction = 0.05; + private long extraGCSpillSizeThreshold = 0L; + + private volatile boolean blockRegisterOnSpill = false; + + private static final SpillableMemoryManager manager = new SpillableMemoryManager(); - private static volatile SpillableMemoryManager manager; + //@StaticDataCleanup + public static void staticDataCleanup() { + manager.spillables.clear(); + manager.accumulatedFreeSize = 0L; + } private SpillableMemoryManager() { ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null); @@ -129,9 +141,6 @@ public class SpillableMemoryManager impl } public static SpillableMemoryManager getInstance() { - if (manager == null) { - manager = new SpillableMemoryManager(); - } return manager; } @@ -187,119 +196,136 @@ public class SpillableMemoryManager impl } } - clearSpillables(); if (toFree < 0) { log.debug("low memory handler returning " + "because there is nothing to free"); return; } - synchronized(spillables) { - Collections.sort(spillables, new Comparator<WeakReference<Spillable>>() { - /** - * We don't lock anything, so this sort may not be stable if a WeakReference suddenly - * becomes null, but it will be close enough. - * Also between the time we sort and we use these spillables, they - * may actually change in size - so this is just best effort - */ - @Override - public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) { - Spillable o1 = o1Ref.get(); - Spillable o2 = o2Ref.get(); - if (o1 == null && o2 == null) { - return 0; - } - if (o1 == null) { - return 1; + // Use a separate spillLock to block multiple handleNotification calls + synchronized (spillLock) { + synchronized(spillables) { + spillablesSR = new LinkedList<SpillablePtr>(); + for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) { + Spillable s = i.next().get(); + if (s == null) { + i.remove(); + continue; } - if (o2 == null) { + // Create a list with spillable size for stable sorting. Refer PIG-4012 + spillablesSR.add(new SpillablePtr(s, s.getMemorySize())); + } + log.debug("Spillables list size: " + spillablesSR.size()); + Collections.sort(spillablesSR, new Comparator<SpillablePtr>() { + @Override + public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) { + long o1Size = o1Ref.getMemorySize(); + long o2Size = o2Ref.getMemorySize(); + + if (o1Size == o2Size) { + return 0; + } + if (o1Size < o2Size) { + return 1; + } return -1; } - long o1Size = o1.getMemorySize(); - long o2Size = o2.getMemorySize(); + }); + // Block new bags from being registered + blockRegisterOnSpill = true; + } - if (o1Size == o2Size) { - return 0; - } - if (o1Size < o2Size) { - return 1; - } - return -1; - } - }); - long estimatedFreed = 0; - int numObjSpilled = 0; - boolean invokeGC = false; - boolean extraGCCalled = false; - for (Iterator<WeakReference<Spillable>> i = spillables.iterator(); i.hasNext();) { - WeakReference<Spillable> weakRef = i.next(); - Spillable s = weakRef.get(); - // Still need to check for null here, even after we removed - // above, because the reference may have gone bad on us - // since the last check. - if (s == null) { - i.remove(); - continue; - } - long toBeFreed = s.getMemorySize(); - log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize); - // Don't keep trying if the rest of files are too small - if (toBeFreed < spillFileSizeThreshold) { - log.debug("spilling small files - getting out of memory handler"); - break ; - } - // If single Spillable is bigger than the threshold, - // we force GC to make sure we really need to keep this - // object before paying for the expensive spill(). - // Done at most once per handleNotification. - // Do not invoke extraGC for GroupingSpillable. Its size will always exceed - // extraGCSpillSizeThreshold and the data is always strong referenced. - if( !extraGCCalled && extraGCSpillSizeThreshold != 0 - && toBeFreed > extraGCSpillSizeThreshold && !(s instanceof GroupingSpillable)) { - log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()"); - // this extra assignment to null is needed so that gc can free the - // spillable if nothing else is pointing at it - s = null; - System.gc(); - extraGCCalled = true; - // checking again to see if this reference is still valid - s = weakRef.get(); + try { + long estimatedFreed = 0; + int numObjSpilled = 0; + boolean invokeGC = false; + boolean extraGCCalled = false; + boolean isGroupingSpillable = false; + for (Iterator<SpillablePtr> i = spillablesSR.iterator(); i.hasNext();) { + SpillablePtr sPtr = i.next(); + Spillable s = sPtr.get(); + // Still need to check for null here, even after we removed + // above, because the reference may have gone bad on us + // since the last check. if (s == null) { i.remove(); - accumulatedFreeSize = 0; - invokeGC = false; continue; } + long toBeFreed = sPtr.getMemorySize(); + log.debug("Memorysize = "+toBeFreed+", spillFilesizethreshold = "+spillFileSizeThreshold+", gcactivationsize = "+gcActivationSize); + // Don't keep trying if the rest of files are too small + if (toBeFreed < spillFileSizeThreshold) { + log.debug("spilling small files - getting out of memory handler"); + break ; + } + isGroupingSpillable = (s instanceof GroupingSpillable); + // If single Spillable is bigger than the threshold, + // we force GC to make sure we really need to keep this + // object before paying for the expensive spill(). + // Done at most once per handleNotification. + // Do not invoke extraGC for GroupingSpillable. Its size will always exceed + // extraGCSpillSizeThreshold and the data is always strong referenced. + if( !extraGCCalled && extraGCSpillSizeThreshold != 0 + && toBeFreed > extraGCSpillSizeThreshold && !isGroupingSpillable) { + log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()"); + // this extra assignment to null is needed so that gc can free the + // spillable if nothing else is pointing at it + s = null; + System.gc(); + extraGCCalled = true; + // checking again to see if this reference is still valid + s = sPtr.get(); + if (s == null) { + i.remove(); + accumulatedFreeSize = 0; + invokeGC = false; + continue; + } + } + // Unblock registering of new bags temporarily as aggregation + // of POPartialAgg requires new record to be loaded. + blockRegisterOnSpill = !isGroupingSpillable; + long numSpilled; + try { + numSpilled = s.spill(); + } finally { + blockRegisterOnSpill = true; + } + + if (numSpilled > 0) { + numObjSpilled++; + estimatedFreed += toBeFreed; + accumulatedFreeSize += toBeFreed; + } + // This should significantly reduce the number of small files + // in case that we have a lot of nested bags + if (accumulatedFreeSize > gcActivationSize) { + invokeGC = true; + } + + if (estimatedFreed > toFree) { + log.debug("Freed enough space - getting out of memory handler"); + invokeGC = true; + break; + } } - s.spill(); - numObjSpilled++; - estimatedFreed += toBeFreed; - accumulatedFreeSize += toBeFreed; - // This should significantly reduce the number of small files - // in case that we have a lot of nested bags - if (accumulatedFreeSize > gcActivationSize) { - invokeGC = true; + spillablesSR = null; + /* Poke the GC again to see if we successfully freed enough memory */ + if(invokeGC) { + System.gc(); + // now that we have invoked the GC, reset accumulatedFreeSize + accumulatedFreeSize = 0; } - - if (estimatedFreed > toFree) { - log.debug("Freed enough space - getting out of memory handler"); - invokeGC = true; - break; + if(estimatedFreed > 0){ + String msg = "Spilled an estimate of " + estimatedFreed + + " bytes from " + numObjSpilled + " objects. " + info.getUsage();; + log.info(msg); } + } finally { + blockRegisterOnSpill = false; } - /* Poke the GC again to see if we successfully freed enough memory */ - if(invokeGC) { - System.gc(); - // now that we have invoked the GC, reset accumulatedFreeSize - accumulatedFreeSize = 0; - } - if(estimatedFreed > 0){ - String msg = "Spilled an estimate of " + estimatedFreed + - " bytes from " + numObjSpilled + " objects. " + info.getUsage();; - log.info(msg); - } - } + } public void clearSpillables() { @@ -321,7 +347,7 @@ public class SpillableMemoryManager impl * @param s the spillable to track. */ public void registerSpillable(Spillable s) { - synchronized(spillables) { + synchronized (spillables) { // Cleaing the entire list is too expensive. Just trim off the front while // we can. WeakReference<Spillable> first = spillables.peek(); @@ -329,7 +355,46 @@ public class SpillableMemoryManager impl spillables.remove(); first = spillables.peek(); } + + if (blockRegisterOnSpill) { + // When the spill is happening we do not want to register new bags + // save for exceptions like POPartialAgg. So block here. + // blockRegisterOnSpill is set to false in the finally block after spill. + // But just in case adding a safeguard of 5 min timeout (assuming a large + // spill completes within 5 mins) instead of infinitely blocking + // in case there are missed corner cases causing deadlock. + try { + int i = 6000; + for (; i > 0 && blockRegisterOnSpill; i--) { + Thread.sleep(50); + } + if (i == 0) { + log.warn("Spill took more than 5 mins. This needs investigation"); + } + } catch (InterruptedException e) { + log.warn("Interrupted exception in registerSpillable while blocked on spill", e); + } + blockRegisterOnSpill = false; + } spillables.add(new WeakReference<Spillable>(s)); } } + + private static class SpillablePtr { + private WeakReference<Spillable> spillable; + private long size; + + public SpillablePtr(Spillable p, long s) { + spillable = new WeakReference<Spillable>(p); + size = s; + } + + public Spillable get() { + return spillable.get(); + } + + public long getMemorySize() { + return size; + } + } } Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Fri Mar 4 18:17:39 2016 @@ -24,8 +24,6 @@ import java.util.HashMap; import java.util.Properties; import org.apache.hadoop.conf.Configuration; -import org.apache.pig.JVMReuseManager; -import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; public class UDFContext { @@ -33,8 +31,9 @@ public class UDFContext { private Configuration jconf = null; private HashMap<UDFContextKey, Properties> udfConfs; private Properties clientSysProps; - private static final String CLIENT_SYS_PROPS = "pig.client.sys.props"; - private static final String UDF_CONTEXT = "pig.udf.context"; + + static final String CLIENT_SYS_PROPS = "pig.client.sys.props"; + static final String UDF_CONTEXT = "pig.udf.context"; private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>() { @Override @@ -43,10 +42,6 @@ public class UDFContext { } }; - static { - JVMReuseManager.getInstance().registerForStaticDataCleanup(UDFContext.class); - } - private UDFContext() { udfConfs = new HashMap<UDFContextKey, Properties>(); } @@ -68,8 +63,8 @@ public class UDFContext { /* * internal pig use only - should NOT be called from user code */ - @StaticDataCleanup - public static void cleanupStaticData() { + //@StaticDataCleanup + public static void staticDataCleanup() { tss = new ThreadLocal<UDFContext>() { @Override public UDFContext initialValue() { @@ -81,6 +76,14 @@ public class UDFContext { /* * internal pig use only - should NOT be called from user code */ + HashMap<UDFContextKey, Properties> getUdfConfs() { + return udfConfs; + } + + + /* + * internal pig use only - should NOT be called from user code + */ public void setClientSystemProps(Properties properties) { clientSysProps = properties; } @@ -201,6 +204,7 @@ public class UDFContext { conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps)); } + /** * Populate the udfConfs field. This function is intended to * be called by Map.configure or Reduce.configure on the backend. @@ -255,23 +259,31 @@ public class UDFContext { * it holds the class and args of the udf, and * implements equals() and hashCode() */ - private static class UDFContextKey implements Serializable{ + static class UDFContextKey implements Serializable{ private static final long serialVersionUID = 1; private String className; private String[] args; - UDFContextKey(){ - } - UDFContextKey(String className, String [] args){ this.className = className; this.args = args; } - /* (non-Javadoc) - * @see java.lang.Object#hashCode() - */ + String getClassName() { + return className; + } + + String[] getArgs() { + return args; + } + + @Override + public String toString() { + return "UDFContextKey [className=" + className + ", args=" + + Arrays.toString(args) + "]"; + } + @Override public int hashCode() { final int prime = 31; @@ -282,9 +294,6 @@ public class UDFContext { return result; } - /* (non-Javadoc) - * @see java.lang.Object#equals(java.lang.Object) - */ @Override public boolean equals(Object obj) { if (this == obj) Modified: pig/branches/spark/src/org/apache/pig/impl/util/Utils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/Utils.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/Utils.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/Utils.java Fri Mar 4 18:17:39 2016 @@ -255,6 +255,13 @@ public class Utils { return schema; } + public static Object parseConstant(String constantString) throws ParserException { + QueryParserDriver queryParser = new QueryParserDriver( new PigContext(), + "util", new HashMap<String, String>() ) ; + Object constant = queryParser.parseConstant(constantString); + return constant; + } + /** * This method adds FieldSchema of 'input source tag/path' as the first * field. This will be called only when PigStorage is invoked with @@ -616,16 +623,9 @@ public class Utils { * @throws IOException */ - public static Path depthFirstSearchForFile(final FileStatus fileStatus, - final FileSystem fileSystem) throws IOException { - if (fileSystem.isFile(fileStatus.getPath())) { - return fileStatus.getPath(); - } else { - return depthFirstSearchForFile( - fileSystem.listStatus(fileStatus.getPath(), VISIBLE_FILES), - fileSystem); - } - + public static Path depthFirstSearchForFile(final FileStatus[] statusArray, + final FileSystem fileSystem) throws IOException { + return depthFirstSearchForFile(statusArray, fileSystem, null); } /** @@ -637,7 +637,7 @@ public class Utils { * @throws IOException */ public static Path depthFirstSearchForFile(final FileStatus[] statusArray, - final FileSystem fileSystem) throws IOException { + final FileSystem fileSystem, PathFilter filter) throws IOException { // Most recent files first Arrays.sort(statusArray, @@ -650,10 +650,17 @@ public class Utils { ); for (FileStatus f : statusArray) { - Path p = depthFirstSearchForFile(f, fileSystem); - if (p != null) { - return p; - } + if (fileSystem.isFile(f.getPath())) { + if (filter == null || filter.accept(f.getPath())) { + return f.getPath(); + } else { + continue; + } + } else { + return depthFirstSearchForFile( + fileSystem.listStatus(f.getPath(), VISIBLE_FILES), + fileSystem, filter); + } } return null; Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroBagWrapper.java Fri Mar 4 18:17:39 2016 @@ -99,7 +99,7 @@ public final class AvroBagWrapper<T> imp if (arg instanceof IndexedRecord) { return new AvroTupleWrapper<IndexedRecord>((IndexedRecord) arg); } else { - return TupleFactory.getInstance().newTuple(AvroTupleWrapper.unionResolver(arg)); + return TupleFactory.getInstance().newTuple(AvroTupleWrapper.getPigObject(arg)); } } } Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java Fri Mar 4 18:17:39 2016 @@ -20,6 +20,7 @@ package org.apache.pig.impl.util.avro; import java.util.AbstractMap; import java.util.Collection; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -65,6 +66,10 @@ public final class AvroMapWrapper implem @Override public boolean containsKey(final Object key) { + if (isUtf8key && !(key instanceof Utf8)) { + // Assuming keys can either be utf8 or string + return innerMap.containsKey(new Utf8((String) key)); + } return innerMap.containsKey(key); } @@ -81,12 +86,7 @@ public final class AvroMapWrapper implem } else { v = innerMap.get(key); } - - if (v instanceof Utf8) { - return v.toString(); - } else { - return v; - } + return AvroTupleWrapper.getPigObject(v); } @Override @@ -112,6 +112,13 @@ public final class AvroMapWrapper implem @Override public Set<CharSequence> keySet() { + if (isUtf8key) { + final Set<CharSequence> keySet = new HashSet<CharSequence>(); + for (CharSequence cs : innerMap.keySet()) { + keySet.add(cs.toString()); + } + return keySet; + } return innerMap.keySet(); } @@ -122,11 +129,7 @@ public final class AvroMapWrapper implem new Function() { @Override public Object apply(final Object v) { - if (v instanceof Utf8) { - return v.toString(); - } else { - return v; - } + return AvroTupleWrapper.getPigObject(v); } } ); @@ -138,18 +141,13 @@ public final class AvroMapWrapper implem Sets.newHashSetWithExpectedSize(innerMap.size()); for (java.util.Map.Entry<CharSequence, Object> e : innerMap.entrySet()) { CharSequence k = e.getKey(); - Object v = e.getValue(); + final Object v = AvroTupleWrapper.getPigObject(e.getValue()); if (k instanceof Utf8) { k = k.toString(); } - if (v instanceof Utf8) { - v = v.toString(); - } theSet.add(new AbstractMap.SimpleEntry<CharSequence, Object>(k, v)); } - return theSet; - } } Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Fri Mar 4 18:17:39 2016 @@ -21,11 +21,14 @@ package org.apache.pig.impl.util.avro; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; @@ -54,35 +57,7 @@ public class AvroStorageDataConversionUt for (Field f : s.getFields()) { Object o = t.get(f.pos()); Schema innerSchema = f.schema(); - if (AvroStorageSchemaConversionUtilities.isNullableUnion(innerSchema)) { - if (o == null) { - record.put(f.pos(), null); - continue; - } - innerSchema = AvroStorageSchemaConversionUtilities - .removeSimpleUnion(innerSchema); - } - switch(innerSchema.getType()) { - case RECORD: - record.put(f.pos(), packIntoAvro((Tuple) o, innerSchema)); - break; - case ARRAY: - record.put(f.pos(), packIntoAvro((DataBag) o, innerSchema)); - break; - case BYTES: - record.put(f.pos(), ByteBuffer.wrap(((DataByteArray) o).get())); - break; - case FIXED: - record.put(f.pos(), new GenericData.Fixed( - innerSchema, ((DataByteArray) o).get())); - break; - default: - if (t.getType(f.pos()) == DataType.DATETIME) { - record.put(f.pos(), ((DateTime) o).getMillis() ); - } else { - record.put(f.pos(), o); - } - } + record.put(f.pos(), packIntoAvro(o, innerSchema)); } return record; } catch (Exception e) { @@ -123,5 +98,52 @@ public class AvroStorageDataConversionUt } } + private static Object packIntoAvro(final Object o, Schema s) + throws IOException { + if (AvroStorageSchemaConversionUtilities.isNullableUnion(s)) { + if (o == null) { + return null; + } + s = AvroStorageSchemaConversionUtilities.removeSimpleUnion(s); + } + // what if o == null and schema doesn't allow it ? + switch (s.getType()) { + case RECORD: + return packIntoAvro((Tuple) o, s); + case ARRAY: + return packIntoAvro((DataBag) o, s); + case MAP: + return packIntoAvro((Map<CharSequence, Object>) o, s); + case BYTES: + return ByteBuffer.wrap(((DataByteArray) o).get()); + case FIXED: + return new GenericData.Fixed(s, ((DataByteArray) o).get()); + default: + if (DataType.findType(o) == DataType.DATETIME) { + return ((DateTime) o).getMillis(); + } else { + return o; + } + } + } + private static Map<Utf8, Object> packIntoAvro(Map<CharSequence, Object> input, Schema schema) + throws IOException { + final Map<Utf8, Object> output = new HashMap<Utf8, Object>(); + for (Map.Entry<CharSequence, Object> e : input.entrySet()) { + final Utf8 k = utf8(e.getKey()); + output.put(k, packIntoAvro(e.getValue(), schema.getValueType())); + } + return output; + } + + private static Utf8 utf8(CharSequence v) { + if (v instanceof Utf8) { + return (Utf8) v; + } else { + final StringBuilder sb = new StringBuilder(v.length()); + sb.append(v); + return new Utf8(sb.toString()); + } + } }
