olga
Sat, 27 Sep 2008 12:44:39 -0700
Author: olga Date: Sat Sep 27 12:43:46 2008 New Revision: 699726 URL: http://svn.apache.org/viewvc?rev=699726&view=rev Log: PIG-463: POCast changes Modified: incubator/pig/branches/types/CHANGES.txt incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Modified: incubator/pig/branches/types/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=699726&r1=699725&r2=699726&view=diff ============================================================================== --- incubator/pig/branches/types/CHANGES.txt (original) +++ incubator/pig/branches/types/CHANGES.txt Sat Sep 27 12:43:46 2008 @@ -257,3 +257,5 @@ PIG-443: Illustrate for the Types branch (shubham via olgan) PIG-376: set job name (olgan) + + PIG-463: POCast changes (pradeepk via olgan) Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=699726&r1=699725&r2=699726&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Sat Sep 27 12:43:46 2008 @@ -47,6 +47,7 @@ private String loadFSpec; transient private LoadFunc load; private Log log = LogFactory.getLog(getClass()); + private boolean castNotNeeded = false; private static final long serialVersionUID = 1L; @@ -90,6 +91,7 @@ @Override public Result getNext(Integer i) throws ExecException { PhysicalOperator in = inputs.get(0); + Byte castToType = DataType.INTEGER; Byte resultType = in.getResultType(); switch(resultType) { case DataType.BAG : { @@ -109,7 +111,30 @@ Result res = in.getNext(dba); if(res.returnStatus == POStatus.STATUS_OK && res.result != null) { //res.result = new Integer(Integer.valueOf((((DataByteArray)res.result).toString()))); - dba = (DataByteArray) res.result; + if(castNotNeeded) { + // we examined the data once before and + // determined that the input is the same + // type as the type we are casting to + // so just send the input out as output + return res; + } + try { + dba = (DataByteArray) res.result; + } catch (ClassCastException e) { + // check if the type of res.result is + // same as the type we are trying to cast to + if(DataType.findType(res.result) == castToType) { + // remember this for future calls + castNotNeeded = true; + // just return the output + return res; + } else { + // the input is a differen type + // rethrow the exception + throw e; + } + + } try { res.result = load.bytesToInteger(dba.get()); } catch (IOException e) { @@ -187,6 +212,7 @@ @Override public Result getNext(Long l) throws ExecException { PhysicalOperator in = inputs.get(0); + Byte castToType = DataType.LONG; Byte resultType = in.getResultType(); switch(resultType) { case DataType.BAG : { @@ -211,8 +237,31 @@ DataByteArray dba = null; Result res = in.getNext(dba); if(res.returnStatus == POStatus.STATUS_OK && res.result != null) { + if(castNotNeeded) { + // we examined the data once before and + // determined that the input is the same + // type as the type we are casting to + // so just send the input out as output + return res; + } //res.result = new Long(Long.valueOf((((DataByteArray)res.result).toString()))); - dba = (DataByteArray) res.result; + try { + dba = (DataByteArray) res.result; + } catch (ClassCastException e) { + // check if the type of res.result is + // same as the type we are trying to cast to + if(DataType.findType(res.result) == castToType) { + // remember this for future calls + castNotNeeded = true; + // just return the output + return res; + } else { + // the input is a differen type + // rethrow the exception + throw e; + } + + } try { res.result = load.bytesToLong(dba.get()); } catch (IOException e) { @@ -285,6 +334,7 @@ @Override public Result getNext(Double d) throws ExecException { PhysicalOperator in = inputs.get(0); + Byte castToType = DataType.DOUBLE; Byte resultType = in.getResultType(); switch(resultType) { case DataType.BAG : { @@ -310,7 +360,30 @@ Result res = in.getNext(dba); if(res.returnStatus == POStatus.STATUS_OK && res.result != null) { //res.result = new Double(Double.valueOf((((DataByteArray)res.result).toString()))); - dba = (DataByteArray) res.result; + if(castNotNeeded) { + // we examined the data once before and + // determined that the input is the same + // type as the type we are casting to + // so just send the input out as output + return res; + } + try { + dba = (DataByteArray) res.result; + } catch (ClassCastException e) { + // check if the type of res.result is + // same as the type we are trying to cast to + if(DataType.findType(res.result) == castToType) { + // remember this for future calls + castNotNeeded = true; + // just return the output + return res; + } else { + // the input is a differen type + // rethrow the exception + throw e; + } + + } try { res.result = load.bytesToDouble(dba.get()); } catch (IOException e) { @@ -382,6 +455,7 @@ @Override public Result getNext(Float f) throws ExecException { PhysicalOperator in = inputs.get(0); + Byte castToType = DataType.FLOAT; Byte resultType = in.getResultType(); switch(resultType) { case DataType.BAG : { @@ -407,7 +481,30 @@ Result res = in.getNext(dba); if(res.returnStatus == POStatus.STATUS_OK && res.result != null) { //res.result = new Float(Float.valueOf((((DataByteArray)res.result).toString()))); - dba = (DataByteArray) res.result; + if(castNotNeeded) { + // we examined the data once before and + // determined that the input is the same + // type as the type we are casting to + // so just send the input out as output + return res; + } + try { + dba = (DataByteArray) res.result; + } catch (ClassCastException e) { + // check if the type of res.result is + // same as the type we are trying to cast to + if(DataType.findType(res.result) == castToType) { + // remember this for future calls + castNotNeeded = true; + // just return the output + return res; + } else { + // the input is a differen type + // rethrow the exception + throw e; + } + + } try { res.result = load.bytesToFloat(dba.get()); } catch (IOException e) { @@ -481,6 +578,7 @@ @Override public Result getNext(String str) throws ExecException { PhysicalOperator in = inputs.get(0); + Byte castToType = DataType.CHARARRAY; Byte resultType = in.getResultType(); switch(resultType) { case DataType.BAG : { @@ -506,7 +604,30 @@ Result res = in.getNext(dba); if(res.returnStatus == POStatus.STATUS_OK && res.result != null) { //res.result = new String(((DataByteArray)res.result).toString()); - dba = (DataByteArray) res.result; + if(castNotNeeded) { + // we examined the data once before and + // determined that the input is the same + // type as the type we are casting to + // so just send the input out as output + return res; + } + try { + dba = (DataByteArray) res.result; + } catch (ClassCastException e) { + // check if the type of res.result is + // same as the type we are trying to cast to + if(DataType.findType(res.result) == castToType) { + // remember this for future calls + castNotNeeded = true; + // just return the output + return res; + } else { + // the input is a differen type + // rethrow the exception + throw e; + } + + } try { res.result = load.bytesToCharArray(dba.get()); } catch (IOException e) { @@ -580,6 +701,7 @@ @Override public Result getNext(Tuple t) throws ExecException { PhysicalOperator in = inputs.get(0); + Byte castToType = DataType.TUPLE; Byte resultType = in.getResultType(); switch(resultType) { @@ -593,7 +715,30 @@ Result res = in.getNext(dba); if(res.returnStatus == POStatus.STATUS_OK && res.result != null) { //res.result = new String(((DataByteArray)res.result).toString()); - dba = (DataByteArray) res.result; + if(castNotNeeded) { + // we examined the data once before and + // determined that the input is the same + // type as the type we are casting to + // so just send the input out as output + return res; + } + try { + dba = (DataByteArray) res.result; + } catch (ClassCastException e) { + // check if the type of res.result is + // same as the type we are trying to cast to + if(DataType.findType(res.result) == castToType) { + // remember this for future calls + castNotNeeded = true; + // just return the output + return res; + } else { + // the input is a differen type + // rethrow the exception + throw e; + } + + } try { res.result = load.bytesToTuple(dba.get()); } catch (IOException e) { @@ -634,6 +779,7 @@ @Override public Result getNext(DataBag bag) throws ExecException { PhysicalOperator in = inputs.get(0); + Byte castToType = DataType.BAG; Byte resultType = in.getResultType(); switch(resultType) { @@ -647,7 +793,30 @@ Result res = in.getNext(dba); if(res.returnStatus == POStatus.STATUS_OK && res.result != null) { //res.result = new String(((DataByteArray)res.result).toString()); - dba = (DataByteArray) res.result; + if(castNotNeeded) { + // we examined the data once before and + // determined that the input is the same + // type as the type we are casting to + // so just send the input out as output + return res; + } + try { + dba = (DataByteArray) res.result; + } catch (ClassCastException e) { + // check if the type of res.result is + // same as the type we are trying to cast to + if(DataType.findType(res.result) == castToType) { + // remember this for future calls + castNotNeeded = true; + // just return the output + return res; + } else { + // the input is a differen type + // rethrow the exception + throw e; + } + + } try { res.result = load.bytesToBag(dba.get()); } catch (IOException e) { @@ -688,6 +857,7 @@ @Override public Result getNext(Map m) throws ExecException { PhysicalOperator in = inputs.get(0); + Byte castToType = DataType.MAP; Byte resultType = in.getResultType(); switch(resultType) { @@ -701,7 +871,30 @@ Result res = in.getNext(dba); if(res.returnStatus == POStatus.STATUS_OK && res.result != null) { //res.result = new String(((DataByteArray)res.result).toString()); - dba = (DataByteArray) res.result; + if(castNotNeeded) { + // we examined the data once before and + // determined that the input is the same + // type as the type we are casting to + // so just send the input out as output + return res; + } + try { + dba = (DataByteArray) res.result; + } catch (ClassCastException e) { + // check if the type of res.result is + // same as the type we are trying to cast to + if(DataType.findType(res.result) == castToType) { + // remember this for future calls + castNotNeeded = true; + // just return the output + return res; + } else { + // the input is a differen type + // rethrow the exception + throw e; + } + + } try { res.result = load.bytesToMap(dba.get()); } catch (IOException e) { Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java?rev=699726&r1=699725&r2=699726&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Sat Sep 27 12:43:46 2008 @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.URL; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Random; @@ -29,6 +30,7 @@ import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; +import org.apache.pig.data.DefaultBagFactory; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.io.BufferedPositionedInputStream; @@ -732,6 +734,7 @@ //System.out.println(res.result + " : " + i); assertEquals(i, res.result); } + } { @@ -825,6 +828,155 @@ } } + private PhysicalPlan constructPlan(POCast op) throws PlanException { + LoadFunc load = new TestLoader(); + op.setLoadFSpec(load.getClass().getName()); + POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0); + PhysicalPlan plan = new PhysicalPlan(); + plan.add(prj); + plan.add(op); + plan.connect(prj, op); + prj.setResultType(DataType.BYTEARRAY); + return plan; + } + + /* + * Test that if the input type is actually same + * as output type and we think that the input type is a + * bytearray we still can handle it. This can happen in the + * following situation: + * If a map in pig (say returned from a UDF) has a key with + * the value being a string, then a lookup of that key being used + * in a context which expects a string will cause an implicit cast + * to a string. This is because the Pig frontend (logical layer) + * thinks of all map "values" as bytearrays and hence introduces + * a Cast to convert the bytearray to string. Though in reality + * the input to the cast is already a string + */ + @Test + public void testByteArrayToOtherNoCast() throws PlanException, ExecException { + POCast op = new POCast(new OperatorKey("", r.nextLong()), -1); + PhysicalPlan plan = constructPlan(op); + TupleFactory tf = TupleFactory.getInstance(); + + { + Tuple t = tf.newTuple(); + Integer input = new Integer(r.nextInt()); + t.append(input); + plan.attachInput(t); + Result res = op.getNext(new Integer(0)); + if(res.returnStatus == POStatus.STATUS_OK) { + //System.out.println(res.result + " : " + i); + assertEquals(input, res.result); + } + } + + { + // create a new POCast each time since we + // maintain a state variable per POCast object + // indicating if cast is really required + POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1); + plan = constructPlan(newOp); + Tuple t = tf.newTuple(); + Float input = new Float(r.nextFloat()); + t.append(input); + plan.attachInput(t); + Result res = newOp.getNext(new Float(0)); + if(res.returnStatus == POStatus.STATUS_OK) { + //System.out.println(res.result + " : " + i); + assertEquals(input, res.result); + } + } + + { + // create a new POCast each time since we + // maintain a state variable per POCast object + // indicating if cast is really required + POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1); + plan = constructPlan(newOp); + Tuple t = tf.newTuple(); + Long input = new Long(r.nextLong()); + t.append(input); + plan.attachInput(t); + Result res = newOp.getNext(new Long(0)); + if(res.returnStatus == POStatus.STATUS_OK) { + //System.out.println(res.result + " : " + i); + assertEquals(input, res.result); + } + } + + { + // create a new POCast each time since we + // maintain a state variable per POCast object + // indicating if cast is really required + POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1); + plan = constructPlan(newOp); + Tuple t = tf.newTuple(); + Double input = new Double(r.nextDouble()); + t.append(input); + plan.attachInput(t); + Result res = newOp.getNext(new Double(0)); + if(res.returnStatus == POStatus.STATUS_OK) { + //System.out.println(res.result + " : " + i); + assertEquals(input, res.result); + } + } + + { + // create a new POCast each time since we + // maintain a state variable per POCast object + // indicating if cast is really required + POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1); + plan = constructPlan(newOp); + Tuple t = tf.newTuple(); + Tuple input = GenRandomData.genRandSmallTuple("test", 1); + t.append(input); + plan.attachInput(t); + Result res = newOp.getNext(tf.newTuple()); + if(res.returnStatus == POStatus.STATUS_OK) { + //System.out.println(res.result + " : " + str); + assertEquals(input, res.result); + } + } + + { + // create a new POCast each time since we + // maintain a state variable per POCast object + // indicating if cast is really required + POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1); + plan = constructPlan(newOp); + Tuple t = tf.newTuple(); + DataBag input = GenRandomData.genRandSmallTupDataBag(r, 10, 100); + t.append(input); + plan.attachInput(t); + Result res = newOp.getNext(DefaultBagFactory.getInstance().newDefaultBag()); + if(res.returnStatus == POStatus.STATUS_OK) { + //System.out.println(res.result + " : " + str); + assertEquals(input, res.result); + } + } + + { + // create a new POCast each time since we + // maintain a state variable per POCast object + // indicating if cast is really required + POCast newOp = new POCast(new OperatorKey("", r.nextLong()), -1); + plan = constructPlan(newOp); + Tuple t = tf.newTuple(); + Map<Object, Object> input = new HashMap<Object, Object>(); + input.put("key1", "value1"); + input.put("key2", "value2"); + t.append(input); + plan.attachInput(t); + Result res = newOp.getNext(new HashMap<Object, Object>()); + if(res.returnStatus == POStatus.STATUS_OK) { + //System.out.println(res.result + " : " + str); + assertEquals(input, res.result); + } + } + + } + @Test public void testTupleToOther() throws PlanException, ExecException { POCast op = new POCast(new OperatorKey("", r.nextLong()), -1);