http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java index e5d7155..341d87e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java @@ -29,6 +29,8 @@ import org.apache.flink.api.common.operators.base.CrossOperatorBase; import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.operators.translation.PlanProjectOperator; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -37,16 +39,15 @@ import org.junit.Test; import java.util.ArrayList; import java.util.List; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertNotNull; -/** - * Created by sebastian on 6/19/14. - */ public class SemanticPropertiesProjectionTest { final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>(); + final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>( BasicTypeInfo.INT_TYPE_INFO, @@ -56,184 +57,228 @@ public class SemanticPropertiesProjectionTest { BasicTypeInfo.INT_TYPE_INFO ); + final List<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>> emptyNestedTupleData = + new ArrayList<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>>(); + + final TupleTypeInfo<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>> nestedTupleTypeInfo = new + TupleTypeInfo<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>>( + BasicTypeInfo.INT_TYPE_INFO, + new TupleTypeInfo<Tuple3<String, Integer, Long>>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO), + new TupleTypeInfo<Tuple2<Long, Long>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO), + BasicTypeInfo.STRING_TYPE_INFO + ); @Test - public void ProjectOperatorTest() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); - - tupleDs.project(1, 3, 2).print(); - - Plan plan = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); - PlanProjectOperator<?, ?> projectOperator = ((PlanProjectOperator<?, ?>) sink.getInput()); - - SingleInputSemanticProperties props = projectOperator.getSemanticProperties(); - - assertTrue(props.getForwardedField(1).size() == 1); - assertTrue(props.getForwardedField(3).size() == 1); - assertTrue(props.getForwardedField(2).size() == 1); - assertTrue(props.getForwardedField(1).contains(0)); - assertTrue(props.getForwardedField(3).contains(1)); - assertTrue(props.getForwardedField(2).contains(2)); - } catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } + public void testProjectionSemProps1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + tupleDs.project(1, 3, 2).project(0, 3).print(); + + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + PlanProjectOperator<?, ?> projectOperator = ((PlanProjectOperator<?, ?>) sink.getInput()); + + SingleInputSemanticProperties props = projectOperator.getSemanticProperties(); + + assertEquals(1, props.getForwardingTargetFields(0, 0).size()); + assertEquals(1, props.getForwardingTargetFields(0, 1).size()); + assertEquals(1, props.getForwardingTargetFields(0, 2).size()); + assertEquals(2, props.getForwardingTargetFields(0, 3).size()); + + assertTrue(props.getForwardingTargetFields(0, 1).contains(0)); + assertTrue(props.getForwardingTargetFields(0, 3).contains(1)); + assertTrue(props.getForwardingTargetFields(0, 2).contains(2)); + assertTrue(props.getForwardingTargetFields(0, 0).contains(3)); + assertTrue(props.getForwardingTargetFields(0, 3).contains(4)); } - + @Test - public void ProjectOperatorWithoutTypesClassTest() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); - - tupleDs.project(1, 3).project(2).project(0).print(); - - Plan plan = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); - PlanProjectOperator<?, ?> projectOperator = ((PlanProjectOperator<?, ?>) sink.getInput()); - - SingleInputSemanticProperties props = projectOperator.getSemanticProperties(); - - assertTrue(props.getForwardedField(1).size() == 1); - assertTrue(props.getForwardedField(3).size() == 1); - assertTrue(props.getForwardedField(2).size() == 1); - assertTrue(props.getForwardedField(0).size() == 1); - assertTrue(props.getForwardedField(1).contains(0)); - assertTrue(props.getForwardedField(3).contains(1)); - assertTrue(props.getForwardedField(2).contains(2)); - assertTrue(props.getForwardedField(0).contains(3)); - } catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } + public void testProjectionSemProps2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo); + + tupleDs.project(2, 3, 1).project(2).print(); + + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + PlanProjectOperator<?, ?> projectOperator = ((PlanProjectOperator<?, ?>) sink.getInput()); + + SingleInputSemanticProperties props = projectOperator.getSemanticProperties(); + + assertNotNull(props.getForwardingTargetFields(0, 0)); + assertEquals(1, props.getForwardingTargetFields(0, 1).size()); + assertEquals(1, props.getForwardingTargetFields(0, 2).size()); + assertEquals(1, props.getForwardingTargetFields(0, 3).size()); + assertEquals(2, props.getForwardingTargetFields(0, 4).size()); + assertEquals(2, props.getForwardingTargetFields(0, 5).size()); + assertEquals(1, props.getForwardingTargetFields(0, 6).size()); + assertEquals(0, props.getForwardingTargetFields(0, 0).size()); + + assertTrue(props.getForwardingTargetFields(0, 4).contains(0)); + assertTrue(props.getForwardingTargetFields(0, 5).contains(1)); + assertTrue(props.getForwardingTargetFields(0, 6).contains(2)); + assertTrue(props.getForwardingTargetFields(0, 1).contains(3)); + assertTrue(props.getForwardingTargetFields(0, 2).contains(4)); + assertTrue(props.getForwardingTargetFields(0, 3).contains(5)); + assertTrue(props.getForwardingTargetFields(0, 4).contains(6)); + assertTrue(props.getForwardingTargetFields(0, 5).contains(7)); } @Test - public void JoinProjectionTest() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); - - tupleDs.join(tupleDs).where(0).equalTo(0).projectFirst(2, 3).projectSecond(1, 4).print(); - - Plan plan = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); - JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((JoinOperatorBase<?, ?, ?, ?>) sink.getInput()); - - DualInputSemanticProperties props = projectJoinOperator.getSemanticProperties(); - - assertTrue(props.getForwardedField1(2).size() == 1); - assertTrue(props.getForwardedField1(3).size() == 1); - assertTrue(props.getForwardedField2(1).size() == 1); - assertTrue(props.getForwardedField2(4).size() == 1); - assertTrue(props.getForwardedField1(2).contains(0)); - assertTrue(props.getForwardedField1(3).contains(1)); - assertTrue(props.getForwardedField2(1).contains(2)); - assertTrue(props.getForwardedField2(4).contains(3)); - } catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } + public void testJoinProjectionSemProps1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + tupleDs.join(tupleDs).where(0).equalTo(0) + .projectFirst(2, 3) + .projectSecond(1, 4) + .print(); + + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((JoinOperatorBase<?, ?, ?, ?>) sink.getInput()); + + DualInputSemanticProperties props = projectJoinOperator.getSemanticProperties(); + + assertEquals(1, props.getForwardingTargetFields(0, 2).size()); + assertEquals(1, props.getForwardingTargetFields(0, 3).size()); + assertEquals(1, props.getForwardingTargetFields(1, 1).size()); + assertEquals(1, props.getForwardingTargetFields(1, 4).size()); + + assertTrue(props.getForwardingTargetFields(0, 2).contains(0)); + assertTrue(props.getForwardingTargetFields(0, 3).contains(1)); + assertTrue(props.getForwardingTargetFields(1, 1).contains(2)); + assertTrue(props.getForwardingTargetFields(1, 4).contains(3)); } - + @Test - public void JoinProjectionWithoutTypesClassTest() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); - - tupleDs.join(tupleDs).where(0).equalTo(0).projectFirst(2).projectFirst(3).projectSecond(1, 4).print(); - - Plan plan = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); - JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((JoinOperatorBase<?, ?, ?, ?>) sink.getInput()); - - DualInputSemanticProperties props = projectJoinOperator.getSemanticProperties(); - - assertTrue(props.getForwardedField1(2).size() == 1); - assertTrue(props.getForwardedField1(3).size() == 1); - assertTrue(props.getForwardedField2(1).size() == 1); - assertTrue(props.getForwardedField2(4).size() == 1); - assertTrue(props.getForwardedField1(2).contains(0)); - assertTrue(props.getForwardedField1(3).contains(1)); - assertTrue(props.getForwardedField2(1).contains(2)); - assertTrue(props.getForwardedField2(4).contains(3)); - } catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } + public void testJoinProjectionSemProps2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo); + + tupleDs.join(tupleDs).where(0).equalTo(0) + .projectFirst(2,0) + .projectSecond(1,3) + .print(); + + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + JoinOperatorBase<?, ?, ?, ?> projectJoinOperator = ((JoinOperatorBase<?, ?, ?, ?>) sink.getInput()); + + DualInputSemanticProperties props = projectJoinOperator.getSemanticProperties(); + + assertEquals(1, props.getForwardingTargetFields(0, 0).size()); + assertNotNull(props.getForwardingTargetFields(0, 1)); + assertNotNull(props.getForwardingTargetFields(0, 2)); + assertNotNull(props.getForwardingTargetFields(0, 3)); + assertEquals(1, props.getForwardingTargetFields(0, 4).size()); + assertEquals(1, props.getForwardingTargetFields(0, 5).size()); + assertNotNull(props.getForwardingTargetFields(0, 6)); + assertEquals(0, props.getForwardingTargetFields(0, 1).size()); + assertEquals(0, props.getForwardingTargetFields(0, 2).size()); + assertEquals(0, props.getForwardingTargetFields(0, 3).size()); + assertEquals(0, props.getForwardingTargetFields(0, 6).size()); + + assertNotNull(props.getForwardingTargetFields(1, 0)); + assertEquals(1, props.getForwardingTargetFields(1, 1).size()); + assertEquals(1, props.getForwardingTargetFields(1, 2).size()); + assertEquals(1, props.getForwardingTargetFields(1, 3).size()); + assertNotNull(props.getForwardingTargetFields(1, 4)); + assertNotNull(props.getForwardingTargetFields(1, 5)); + assertEquals(1, props.getForwardingTargetFields(1, 6).size()); + assertEquals(0, props.getForwardingTargetFields(1, 0).size()); + assertEquals(0, props.getForwardingTargetFields(1, 4).size()); + assertEquals(0, props.getForwardingTargetFields(1, 5).size()); + + assertTrue(props.getForwardingTargetFields(0, 4).contains(0)); + assertTrue(props.getForwardingTargetFields(0, 5).contains(1)); + assertTrue(props.getForwardingTargetFields(0, 0).contains(2)); + assertTrue(props.getForwardingTargetFields(1, 1).contains(3)); + assertTrue(props.getForwardingTargetFields(1, 2).contains(4)); + assertTrue(props.getForwardingTargetFields(1, 3).contains(5)); + assertTrue(props.getForwardingTargetFields(1, 6).contains(6)); } @Test - public void CrossProjectionTest() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); - - DataSet<Tuple4<String, Long, Long, Integer>> result = tupleDs.cross(tupleDs).projectFirst(2, 3).projectSecond(1, 4); - result.print(); - - Plan plan = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); - CrossOperatorBase<?, ?, ?, ?> projectCrossOperator = ((CrossOperatorBase<?, ?, ?, ?>) sink.getInput()); - - DualInputSemanticProperties props = projectCrossOperator.getSemanticProperties(); - - assertTrue(props.getForwardedField1(2).size() == 1); - assertTrue(props.getForwardedField1(3).size() == 1); - assertTrue(props.getForwardedField2(1).size() == 1); - assertTrue(props.getForwardedField2(4).size() == 1); - assertTrue(props.getForwardedField1(2).contains(0)); - assertTrue(props.getForwardedField1(3).contains(1)); - assertTrue(props.getForwardedField2(1).contains(2)); - assertTrue(props.getForwardedField2(4).contains(3)); - } catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } + public void testCrossProjectionSemProps1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + tupleDs.cross(tupleDs) + .projectFirst(2, 3) + .projectSecond(1, 4) + .print(); + + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + CrossOperatorBase<?, ?, ?, ?> projectCrossOperator = ((CrossOperatorBase<?, ?, ?, ?>) sink.getInput()); + + DualInputSemanticProperties props = projectCrossOperator.getSemanticProperties(); + + assertEquals(1, props.getForwardingTargetFields(0, 2).size()); + assertEquals(1, props.getForwardingTargetFields(0, 3).size()); + assertEquals(1, props.getForwardingTargetFields(1, 1).size()); + assertEquals(1, props.getForwardingTargetFields(1, 4).size()); + + assertTrue(props.getForwardingTargetFields(0, 2).contains(0)); + assertTrue(props.getForwardingTargetFields(0, 3).contains(1)); + assertTrue(props.getForwardingTargetFields(1, 1).contains(2)); + assertTrue(props.getForwardingTargetFields(1, 4).contains(3)); } - + @Test - public void CrossProjectionWithoutTypesClassTest() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); - - DataSet<Tuple4<String, Long, Long, Integer>> result = tupleDs.cross(tupleDs).projectFirst(2, 3).projectSecond(1, 4); - result.print(); - - Plan plan = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); - CrossOperatorBase<?, ?, ?, ?> projectCrossOperator = ((CrossOperatorBase<?, ?, ?, ?>) sink.getInput()); - - DualInputSemanticProperties props = projectCrossOperator.getSemanticProperties(); - - assertTrue(props.getForwardedField1(2).size() == 1); - assertTrue(props.getForwardedField1(3).size() == 1); - assertTrue(props.getForwardedField2(1).size() == 1); - assertTrue(props.getForwardedField2(4).size() == 1); - assertTrue(props.getForwardedField1(2).contains(0)); - assertTrue(props.getForwardedField1(3).contains(1)); - assertTrue(props.getForwardedField2(1).contains(2)); - assertTrue(props.getForwardedField2(4).contains(3)); - } catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } + public void testCrossProjectionSemProps2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple4<Integer, Tuple3<String, Integer, Long>, Tuple2<Long, Long>, String>> tupleDs = env.fromCollection(emptyNestedTupleData, nestedTupleTypeInfo); + + tupleDs.cross(tupleDs) + .projectFirst(2, 0) + .projectSecond(1,3) + .print(); + + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + CrossOperatorBase<?, ?, ?, ?> projectCrossOperator = ((CrossOperatorBase<?, ?, ?, ?>) sink.getInput()); + + DualInputSemanticProperties props = projectCrossOperator.getSemanticProperties(); + + assertEquals(1, props.getForwardingTargetFields(0, 0).size()); + assertNotNull(props.getForwardingTargetFields(0, 1)); + assertNotNull(props.getForwardingTargetFields(0, 2)); + assertNotNull(props.getForwardingTargetFields(0, 3)); + assertEquals(1, props.getForwardingTargetFields(0, 4).size()); + assertEquals(1, props.getForwardingTargetFields(0, 5).size()); + assertNotNull(props.getForwardingTargetFields(0, 6)); + assertEquals(0, props.getForwardingTargetFields(0, 1).size()); + assertEquals(0, props.getForwardingTargetFields(0, 2).size()); + assertEquals(0, props.getForwardingTargetFields(0, 3).size()); + assertEquals(0, props.getForwardingTargetFields(0, 6).size()); + + assertNotNull(props.getForwardingTargetFields(1, 0)); + assertEquals(1, props.getForwardingTargetFields(1, 1).size()); + assertEquals(1, props.getForwardingTargetFields(1, 2).size()); + assertEquals(1, props.getForwardingTargetFields(1, 3).size()); + assertNotNull(props.getForwardingTargetFields(1, 4)); + assertNotNull(props.getForwardingTargetFields(1, 5)); + assertEquals(1, props.getForwardingTargetFields(1, 6).size()); + assertEquals(0, props.getForwardingTargetFields(1, 0).size()); + assertEquals(0, props.getForwardingTargetFields(1, 4).size()); + assertEquals(0, props.getForwardingTargetFields(1, 5).size()); + + assertTrue(props.getForwardingTargetFields(0, 4).contains(0)); + assertTrue(props.getForwardingTargetFields(0, 5).contains(1)); + assertTrue(props.getForwardingTargetFields(0, 0).contains(2)); + assertTrue(props.getForwardingTargetFields(1, 1).contains(3)); + assertTrue(props.getForwardingTargetFields(1, 2).contains(4)); + assertTrue(props.getForwardingTargetFields(1, 3).contains(5)); + assertTrue(props.getForwardingTargetFields(1, 6).contains(6)); } + }
http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java index bd8b55e..f0124e3 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesTranslationTest.java @@ -22,17 +22,18 @@ package org.apache.flink.api.java.functions; import static org.junit.Assert.*; import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.RichJoinFunction; -import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.DualInputSemanticProperties; import org.apache.flink.api.common.operators.GenericDataSinkBase; +import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.base.JoinOperatorBase; import org.apache.flink.api.common.operators.base.MapOperatorBase; import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.junit.Test; @@ -43,247 +44,496 @@ import org.apache.flink.api.java.ExecutionEnvironment; /** * This is a minimal test to verify that semantic annotations are evaluated against * the type information properly translated correctly to the common data flow API. - * - * This covers only the constant fields annotations currently !!! + * */ @SuppressWarnings("serial") public class SemanticPropertiesTranslationTest { - /** - * A mapper that preserves all fields over a tuple data set. - */ @Test - public void translateUnaryFunctionAnnotationTuplesWildCard() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - @SuppressWarnings("unchecked") - DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42)); - input.map(new WildcardConstantMapper<Tuple3<Long,String,Integer>>()).print(); - - Plan plan = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); - MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); - - SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); - - FieldSet fw1 = semantics.getForwardedField(0); - FieldSet fw2 = semantics.getForwardedField(1); - FieldSet fw3 = semantics.getForwardedField(2); - - assertNotNull(fw1); - assertNotNull(fw2); - assertNotNull(fw3); - - assertTrue(fw1.contains(0)); - assertTrue(fw2.contains(1)); - assertTrue(fw3.contains(2)); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } + public void testUnaryFunctionWildcardForwardedAnnotation() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42)); + input.map(new WildcardForwardedMapper<Tuple3<Long,String,Integer>>()).print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); + + SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); + + FieldSet fw1 = semantics.getForwardingTargetFields(0, 0); + FieldSet fw2 = semantics.getForwardingTargetFields(0, 1); + FieldSet fw3 = semantics.getForwardingTargetFields(0, 2); + assertNotNull(fw1); + assertNotNull(fw2); + assertNotNull(fw3); + assertTrue(fw1.contains(0)); + assertTrue(fw2.contains(1)); + assertTrue(fw3.contains(2)); } - /** - * A mapper that preserves fields 0, 1, 2 of a tuple data set. - */ @Test - public void translateUnaryFunctionAnnotationTuples() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - @SuppressWarnings("unchecked") - DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42)); - input.map(new IndividualConstantMapper<Long, String, Integer>()).print(); - - Plan plan = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); - MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); - - SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); - - FieldSet fw1 = semantics.getForwardedField(0); - FieldSet fw2 = semantics.getForwardedField(1); - FieldSet fw3 = semantics.getForwardedField(2); - - assertNotNull(fw1); - assertNotNull(fw2); - assertNotNull(fw3); - - assertTrue(fw1.contains(0)); - assertTrue(fw2.contains(1)); - assertTrue(fw3.contains(2)); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } + public void testUnaryFunctionInPlaceForwardedAnnotation() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, String, Integer>> input = env.fromElements(new Tuple3<Long, String, Integer>(3l, "test", 42)); + input.map(new IndividualForwardedMapper<Long, String, Integer>()).print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); + + SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); + + FieldSet fw1 = semantics.getForwardingTargetFields(0, 0); + FieldSet fw2 = semantics.getForwardingTargetFields(0, 2); + assertNotNull(fw1); + assertNotNull(fw2); + assertTrue(fw1.contains(0)); + assertTrue(fw2.contains(2)); } - -// /** -// * A mapper that preserves all fields over a data set of an atomic type. -// */ -// @Test -// public void translateUnaryFunctionAnnotationAtomicWildCard() { -// try { -// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -// DataSet<Long> input = env.generateSequence(0, 1000); -// input.map(new WildcardConstantMapper<Long>()).print(); -// -// Plan plan = env.createProgramPlan(); -// -// GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); -// MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); -// -// SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); -// -// FieldSet fw1 = semantics.getForwardedField(0); -// assertNotNull(fw1); -// assertTrue(fw1.contains(0)); -// } -// catch (Exception e) { -// System.err.println(e.getMessage()); -// e.printStackTrace(); -// fail("Exception in test: " + e.getMessage()); -// } -// } - -// /** -// * A mapper that preserves field zero of a data set of an atomic type. -// */ -// @Test -// public void translateUnaryFunctionAnnotationAtomicZero() { -// try { -// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -// DataSet<Long> input = env.generateSequence(0, 1000); -// input.map(new ZeroConstantMapper<Long>()).print(); -// -// Plan plan = env.createProgramPlan(); -// -// GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); -// MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); -// -// SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); -// -// FieldSet fw1 = semantics.getForwardedField(0); -// FieldSet fw2 = semantics.getForwardedField(1); -// FieldSet fw3 = semantics.getForwardedField(2); -// -// assertNotNull(fw1); -// assertNotNull(fw2); -// assertNotNull(fw3); -// -// assertTrue(fw1.contains(0)); -// assertTrue(fw2.contains(1)); -// assertTrue(fw3.contains(2)); -// } -// catch (Exception e) { -// System.err.println(e.getMessage()); -// e.printStackTrace(); -// fail("Exception in test: " + e.getMessage()); -// } -// } - - - /** - * A join that preserves tuple fields from both sides. - */ + @Test - public void translateBinaryFunctionAnnotationTuples() { - try { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - @SuppressWarnings("unchecked") - DataSet<Tuple2<Long, String>> input1 = env.fromElements(new Tuple2<Long, String>(3l, "test")); - @SuppressWarnings("unchecked") - DataSet<Tuple2<Long, Double>> input2 = env.fromElements(new Tuple2<Long, Double>(3l, 3.1415)); - - input1.join(input2).where(0).equalTo(0).with(new ForwardingTupleJoin<Long, String, Long, Double>()) + public void testUnaryFunctionMovingForwardedAnnotation() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input.map(new ShufflingMapper<Long>()).print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); + + SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); + + FieldSet fw1 = semantics.getForwardingTargetFields(0, 0); + FieldSet fw2 = semantics.getForwardingTargetFields(0, 1); + FieldSet fw3 = semantics.getForwardingTargetFields(0, 2); + assertNotNull(fw1); + assertNotNull(fw2); + assertNotNull(fw3); + assertTrue(fw1.contains(2)); + assertTrue(fw2.contains(0)); + assertTrue(fw3.contains(1)); + } + + @Test + public void testUnaryFunctionForwardedInLine1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input.map(new NoAnnotationMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2") .print(); - - Plan plan = env.createProgramPlan(); - - GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); - JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput(); - - DualInputSemanticProperties semantics = join.getSemanticProperties(); - - FieldSet fw11 = semantics.getForwardedField1(0); - FieldSet fw12 = semantics.getForwardedField1(1); - FieldSet fw21 = semantics.getForwardedField2(0); - FieldSet fw22 = semantics.getForwardedField2(1); - - assertNull(fw11); - assertNull(fw21); - - assertNotNull(fw12); - assertNotNull(fw22); - - assertTrue(fw12.contains(0)); - assertTrue(fw22.contains(1)); - } - catch (Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - fail("Exception in test: " + e.getMessage()); - } + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); + + SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); + + FieldSet fw1 = semantics.getForwardingTargetFields(0, 0); + FieldSet fw2 = semantics.getForwardingTargetFields(0, 2); + assertNotNull(fw1); + assertNotNull(fw2); + assertTrue(fw1.contains(1)); + assertTrue(fw2.contains(2)); } - -// /** -// * A join that preserves atomic fields from both sides. -// */ -// @Test -// public void translateBinaryFunctionAnnotationAtomicType() { -// try { -// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); -// -// DataSet<Long> input1 = env.generateSequence(0, 1000); -// DataSet<Long> input2 = env.generateSequence(0, 1000); -// -// input1.join(input2) -// .where(new KeySelector<Long, Long>() { public Long getKey(Long value) { return value; }}) -// .equalTo(new KeySelector<Long, Long>() { public Long getKey(Long value) { return value; }}) -// .with(new ForwardingBasicJoin<Long, Long>()) -// .print(); -// -// Plan plan = env.createProgramPlan(); -// -// GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); -// JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput(); -// -// DualInputSemanticProperties semantics = join.getSemanticProperties(); -// -// FieldSet fw11 = semantics.getForwardedField1(0); -// FieldSet fw12 = semantics.getForwardedField1(1); -// FieldSet fw21 = semantics.getForwardedField2(0); -// FieldSet fw22 = semantics.getForwardedField2(1); -// -// assertNull(fw11); -// assertNull(fw21); -// -// assertNotNull(fw12); -// assertNotNull(fw22); -// -// assertTrue(fw12.contains(0)); -// assertTrue(fw22.contains(1)); -// } -// catch (Exception e) { -// System.err.println(e.getMessage()); -// e.printStackTrace(); -// fail("Exception in test: " + e.getMessage()); -// } -// } - + + @Test + public void testUnaryFunctionForwardedInLine2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2") + .print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); + + SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); + + FieldSet fw1 = semantics.getForwardingTargetFields(0, 0); + FieldSet fw2 = semantics.getForwardingTargetFields(0, 2); + assertNotNull(fw1); + assertNotNull(fw2); + assertTrue(fw1.contains(1)); + assertTrue(fw2.contains(2)); + } + + @Test + public void testUnaryFunctionForwardedInLine3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2") + .print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); + + SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); + + FieldSet fw1 = semantics.getForwardingTargetFields(0, 0); + FieldSet fw2 = semantics.getForwardingTargetFields(0, 2); + assertNotNull(fw1); + assertNotNull(fw2); + assertTrue(fw1.contains(1)); + assertTrue(fw2.contains(2)); + } + + @Test + public void testUnaryFunctionAllForwardedExceptAnnotation() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, Long>>()).print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); + + SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); + + FieldSet fw1 = semantics.getForwardingTargetFields(0, 0); + FieldSet fw2 = semantics.getForwardingTargetFields(0, 2); + assertNotNull(fw1); + assertNotNull(fw2); + assertTrue(fw1.contains(0)); + assertTrue(fw2.contains(2)); + } + + @Test + public void testUnaryFunctionReadFieldsAnnotation() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input.map(new ReadSetMapper<Tuple3<Long, Long, Long>>()).print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + MapOperatorBase<?, ?, ?> mapper = (MapOperatorBase<?, ?, ?>) sink.getInput(); + + SingleInputSemanticProperties semantics = mapper.getSemanticProperties(); + + FieldSet read = semantics.getReadFields(0); + assertNotNull(read); + assertEquals(2, read.size()); + assertTrue(read.contains(0)); + assertTrue(read.contains(2)); + } + + @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class) + public void testUnaryForwardedOverwritingInLine1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input.map(new WildcardForwardedMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2"); + } + + @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class) + public void testUnaryForwardedOverwritingInLine2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input.map(new AllForwardedExceptMapper<Tuple3<Long, Long, Long>>()).withForwardedFields("0->1; 2"); + } + + @Test + public void testBinaryForwardedAnnotation() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, String>> input1 = env.fromElements(new Tuple2<Long, String>(3l, "test")); + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Double>> input2 = env.fromElements(new Tuple2<Long, Double>(3l, 3.1415)); + input1.join(input2).where(0).equalTo(0).with(new ForwardedBothAnnotationJoin<Long, String, Long, Double>()) + .print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput(); + + DualInputSemanticProperties semantics = join.getSemanticProperties(); + assertNotNull(semantics.getForwardingTargetFields(0, 0)); + assertNotNull(semantics.getForwardingTargetFields(1, 0)); + assertEquals(1, semantics.getForwardingTargetFields(0, 1).size()); + assertEquals(1, semantics.getForwardingTargetFields(1, 1).size()); + assertTrue(semantics.getForwardingTargetFields(0, 1).contains(0)); + assertTrue(semantics.getForwardingTargetFields(1, 1).contains(1)); + assertEquals(0, semantics.getForwardingTargetFields(0, 0).size()); + assertEquals(0, semantics.getForwardingTargetFields(1, 0).size()); + } + + @Test + public void testBinaryForwardedInLine1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3l, 4l)); + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l)); + input1.join(input2).where(0).equalTo(0).with(new NoAnnotationJoin<Long>()) + .withForwardedFieldsFirst("0->1; 1->2").withForwardedFieldsSecond("1->0") + .print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput(); + + DualInputSemanticProperties semantics = join.getSemanticProperties(); + assertNotNull(semantics.getForwardingTargetFields(1, 0)); + assertEquals(1, semantics.getForwardingTargetFields(0, 0).size()); + assertEquals(1, semantics.getForwardingTargetFields(0, 1).size()); + assertEquals(1, semantics.getForwardingTargetFields(1, 1).size()); + assertTrue(semantics.getForwardingTargetFields(0, 0).contains(1)); + assertTrue(semantics.getForwardingTargetFields(0, 1).contains(2)); + assertTrue(semantics.getForwardingTargetFields(1, 1).contains(0)); + assertEquals(0, semantics.getForwardingTargetFields(1, 0).size()); + } + + @Test + public void testBinaryForwardedInLine2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3l, 4l)); + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l)); + input1.join(input2).where(0).equalTo(0).with(new ReadSetJoin<Long>()) + .withForwardedFieldsFirst("0->1; 1->2").withForwardedFieldsSecond("1->0") + .print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput(); + + DualInputSemanticProperties semantics = join.getSemanticProperties(); + assertNotNull(semantics.getForwardingTargetFields(1, 0)); + assertEquals(1, semantics.getForwardingTargetFields(0, 0).size()); + assertEquals(1, semantics.getForwardingTargetFields(0, 1).size()); + assertEquals(1, semantics.getForwardingTargetFields(1, 1).size()); + assertTrue(semantics.getForwardingTargetFields(0, 0).contains(1)); + assertTrue(semantics.getForwardingTargetFields(0, 1).contains(2)); + assertTrue(semantics.getForwardingTargetFields(1, 1).contains(0)); + assertNotNull(semantics.getReadFields(0)); + assertNotNull(semantics.getReadFields(1)); + assertEquals(1, semantics.getReadFields(0).size()); + assertEquals(1, semantics.getReadFields(1).size()); + assertTrue(semantics.getReadFields(0).contains(1)); + assertTrue(semantics.getReadFields(1).contains(0)); + assertEquals(0, semantics.getForwardingTargetFields(1, 0).size()); + } + + @Test + public void testBinaryForwardedAnnotationInLineMixed1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3l, 4l)); + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l)); + input1.join(input2).where(0).equalTo(0).with(new ForwardedFirstAnnotationJoin<Long>()) + .withForwardedFieldsSecond("1") + .print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput(); + + DualInputSemanticProperties semantics = join.getSemanticProperties(); + assertNotNull(semantics.getForwardingTargetFields(0, 1)); + assertNotNull(semantics.getForwardingTargetFields(1, 0)); + assertNotNull(semantics.getForwardingTargetFields(0, 0)); + assertNotNull(semantics.getForwardingTargetFields(1, 1)); + assertEquals(1, semantics.getForwardingTargetFields(0, 0).size()); + assertEquals(1, semantics.getForwardingTargetFields(1, 1).size()); + assertTrue(semantics.getForwardingTargetFields(0, 0).contains(2)); + assertTrue(semantics.getForwardingTargetFields(1, 1).contains(1)); + assertEquals(0, semantics.getForwardingTargetFields(0, 1).size()); + assertEquals(0, semantics.getForwardingTargetFields(1, 0).size()); + + } + + @Test + public void testBinaryForwardedAnnotationInLineMixed2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3l, 4l)); + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l)); + input1.join(input2).where(0).equalTo(0).with(new ForwardedSecondAnnotationJoin<Long>()) + .withForwardedFieldsFirst("0->1") + .print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput(); + + DualInputSemanticProperties semantics = join.getSemanticProperties(); + assertNotNull(semantics.getForwardingTargetFields(0, 1)); + assertNotNull(semantics.getForwardingTargetFields(1, 0)); + assertNotNull(semantics.getForwardingTargetFields(0, 0)); + assertNotNull(semantics.getForwardingTargetFields(1, 1)); + assertEquals(1, semantics.getForwardingTargetFields(0, 0).size()); + assertEquals(1, semantics.getForwardingTargetFields(1, 1).size()); + assertTrue(semantics.getForwardingTargetFields(0, 0).contains(1)); + assertTrue(semantics.getForwardingTargetFields(1, 1).contains(2)); + assertEquals(0, semantics.getForwardingTargetFields(0, 1).size()); + assertEquals(0, semantics.getForwardingTargetFields(1, 0).size()); + } + + @Test + public void testBinaryAllForwardedExceptAnnotation() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input1 = env.fromElements(new Tuple3<Long, Long, Long>(3l, 4l, 5l)); + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input1.join(input2).where(0).equalTo(0).with(new AllForwardedExceptJoin<Long>()) + .print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput(); + + DualInputSemanticProperties semantics = join.getSemanticProperties(); + assertNotNull(semantics.getForwardingTargetFields(0, 0)); + assertNotNull(semantics.getForwardingTargetFields(0, 2)); + assertNotNull(semantics.getForwardingTargetFields(1, 0)); + assertNotNull(semantics.getForwardingTargetFields(1, 1)); + assertEquals(1, semantics.getForwardingTargetFields(0, 1).size()); + assertEquals(1, semantics.getForwardingTargetFields(1, 2).size()); + assertTrue(semantics.getForwardingTargetFields(0, 1).contains(1)); + assertTrue(semantics.getForwardingTargetFields(1, 2).contains(2)); + assertEquals(0, semantics.getForwardingTargetFields(0, 0).size()); + assertEquals(0, semantics.getForwardingTargetFields(0, 2).size()); + assertEquals(0, semantics.getForwardingTargetFields(1, 0).size()); + assertEquals(0, semantics.getForwardingTargetFields(1, 1).size()); + } + + @Test + public void testBinaryReadFieldsAnnotation() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3l, 4l)); + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l)); + input1.join(input2).where(0).equalTo(0).with(new ReadSetJoin<Long>()) + .print(); + Plan plan = env.createProgramPlan(); + + GenericDataSinkBase<?> sink = plan.getDataSinks().iterator().next(); + JoinOperatorBase<?, ?, ?, ?> join = (JoinOperatorBase<?, ?, ?, ?>) sink.getInput(); + + DualInputSemanticProperties semantics = join.getSemanticProperties(); + assertNotNull(semantics.getReadFields(0)); + assertNotNull(semantics.getReadFields(1)); + assertEquals(1, semantics.getReadFields(0).size()); + assertEquals(1, semantics.getReadFields(1).size()); + assertTrue(semantics.getReadFields(0).contains(1)); + assertTrue(semantics.getReadFields(1).contains(0)); + } + + @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class) + public void testBinaryForwardedOverwritingInLine1() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3l, 4l)); + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l)); + input1.join(input2).where(0).equalTo(0).with(new ForwardedFirstAnnotationJoin<Long>()) + .withForwardedFieldsFirst("0->1"); + } + + @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class) + public void testBinaryForwardedOverwritingInLine2() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3l, 4l)); + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l)); + input1.join(input2).where(0).equalTo(0).with(new ForwardedSecondAnnotationJoin<Long>()) + .withForwardedFieldsSecond("0->1"); + } + + @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class) + public void testBinaryForwardedOverwritingInLine3() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3l, 4l)); + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l)); + input1.join(input2).where(0).equalTo(0).with(new ForwardedBothAnnotationJoin<Long, Long, Long, Long>()) + .withForwardedFieldsFirst("0->1;"); + } + + @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class) + public void testBinaryForwardedOverwritingInLine4() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(3l, 4l)); + @SuppressWarnings("unchecked") + DataSet<Tuple2<Long, Long>> input2 = env.fromElements(new Tuple2<Long, Long>(3l, 2l)); + input1.join(input2).where(0).equalTo(0).with(new ForwardedBothAnnotationJoin<Long, Long, Long, Long>()) + .withForwardedFieldsSecond("0->1;"); + } + + @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class) + public void testBinaryForwardedOverwritingInLine5() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input1 = env.fromElements(new Tuple3<Long, Long, Long>(3l, 4l, 5l)); + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input1.join(input2).where(0).equalTo(0).with(new AllForwardedExceptJoin<Long>()) + .withForwardedFieldsFirst("0->1;"); + } + + @Test(expected = SemanticProperties.InvalidSemanticAnnotationException.class) + public void testBinaryForwardedOverwritingInLine6() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input1 = env.fromElements(new Tuple3<Long, Long, Long>(3l, 4l, 5l)); + @SuppressWarnings("unchecked") + DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(3l, 2l, 1l)); + input1.join(input2).where(0).equalTo(0).with(new AllForwardedExceptJoin<Long>()) + .withForwardedFieldsSecond("0->1;"); + } + // -------------------------------------------------------------------------------------------- + + public static class NoAnnotationMapper<T> implements MapFunction<T, T> { + + @Override + public T map(T value) { + return value; + } + } - - @ConstantFields("*") - public static class WildcardConstantMapper<T> extends RichMapFunction<T, T> { + @ForwardedFields("*") + public static class WildcardForwardedMapper<T> implements MapFunction<T, T> { @Override public T map(T value) { @@ -291,41 +541,95 @@ public class SemanticPropertiesTranslationTest { } } - @ConstantFields("0->0;1->1;2->2") - public static class IndividualConstantMapper<X, Y, Z> extends RichMapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> { + @ForwardedFields("0;2") + public static class IndividualForwardedMapper<X, Y, Z> implements MapFunction<Tuple3<X, Y, Z>, Tuple3<X, Y, Z>> { @Override public Tuple3<X, Y, Z> map(Tuple3<X, Y, Z> value) { return value; } } - - @ConstantFields("0") - public static class ZeroConstantMapper<T> extends RichMapFunction<T, T> { + + @ForwardedFields("0->2;1->0;2->1") + public static class ShufflingMapper<X> implements MapFunction<Tuple3<X, X, X>, Tuple3<X, X, X>> { + + @Override + public Tuple3<X, X, X> map(Tuple3<X, X, X> value) { + return value; + } + } + + @FunctionAnnotation.NonForwardedFields({"1"}) + public static class AllForwardedExceptMapper<T> implements MapFunction<T, T> { + + @Override + public T map(T value) { + return value; + } + } + + @FunctionAnnotation.ReadFields({"0;2"}) + public static class ReadSetMapper<T> implements MapFunction<T, T> { @Override public T map(T value) { return value; } } - - @ConstantFieldsFirst("1 -> 0") - @ConstantFieldsSecond("1 -> 1") - public static class ForwardingTupleJoin<A, B, C, D> extends RichJoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> { + + public static class NoAnnotationJoin<X> implements JoinFunction<Tuple2<X,X>, Tuple2<X,X>, Tuple3<X,X,X>> { + + @Override + public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> second) throws Exception { + return null; + } + } + + @ForwardedFieldsFirst("0->2") + public static class ForwardedFirstAnnotationJoin<X> implements JoinFunction<Tuple2<X,X>, Tuple2<X,X>, Tuple3<X,X,X>> { + + @Override + public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> second) throws Exception { + return null; + } + } + + @ForwardedFieldsSecond("1->2") + public static class ForwardedSecondAnnotationJoin<X> implements JoinFunction<Tuple2<X,X>, Tuple2<X,X>, Tuple3<X,X,X>> { + + @Override + public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> second) throws Exception { + return null; + } + } + + @ForwardedFieldsFirst("1 -> 0") + @ForwardedFieldsSecond("1 -> 1") + public static class ForwardedBothAnnotationJoin<A, B, C, D> implements JoinFunction<Tuple2<A, B>, Tuple2<C, D>, Tuple2<B, D>> { @Override public Tuple2<B, D> join(Tuple2<A, B> first, Tuple2<C, D> second) { return new Tuple2<B, D>(first.f1, second.f1); } } - - @ConstantFieldsFirst("0 -> 0") - @ConstantFieldsSecond("0 -> 1") - public static class ForwardingBasicJoin<A, B> extends RichJoinFunction<A, B, Tuple2<A, B>> { + + @FunctionAnnotation.NonForwardedFieldsFirst("0;2") + @FunctionAnnotation.NonForwardedFieldsSecond("0;1") + public static class AllForwardedExceptJoin<X> implements JoinFunction<Tuple3<X,X,X>, Tuple3<X,X,X>, Tuple3<X,X,X>> { + + @Override + public Tuple3<X, X, X> join(Tuple3<X, X, X> first, Tuple3<X, X, X> second) throws Exception { + return null; + } + } + + @FunctionAnnotation.ReadFieldsFirst("1") + @FunctionAnnotation.ReadFieldsSecond("0") + public static class ReadSetJoin<X> implements JoinFunction<Tuple2<X,X>, Tuple2<X,X>, Tuple3<X,X,X>> { @Override - public Tuple2<A, B> join(A first, B second) { - return new Tuple2<A, B>(first, second); + public Tuple3<X, X, X> join(Tuple2<X, X> first, Tuple2<X, X> second) throws Exception { + return null; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java index c8e5ba3..67d0240 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java @@ -106,16 +106,14 @@ public class KeysTest { new String[] {"f11"}, new String[] {"f-35"}, new String[] {"f0.f33"}, - new String[] {"f1.f33"}, - new String[] {"f1"} // select full tuple without saying "f1.*" + new String[] {"f1.f33"} }; for(int i = 0; i < tests.length; i++) { Throwable e = null; try { new ExpressionKeys<Tuple3<String, Tuple3<String, String, String>, String>>(tests[i], typeInfo); } catch(Throwable t) { - // System.err.println("Message: "+t.getMessage()); t.printStackTrace(); - e = t; + e = t; } Assert.assertNotNull(e); } @@ -127,16 +125,14 @@ public class KeysTest { String[][] tests = new String[][] { new String[] {"nonexistent"}, - new String[] {"date.abc"}, // nesting into unnested - new String[] {"word"} // select full tuple without saying "f1.*" + new String[] {"date.abc"} // nesting into unnested }; for(int i = 0; i < tests.length; i++) { Throwable e = null; try { new ExpressionKeys<ComplexNestedClass>(tests[i], ti); } catch(Throwable t) { - // System.err.println("Message: "+t.getMessage()); t.printStackTrace(); - e = t; + e = t; } Assert.assertNotNull(e); } @@ -214,6 +210,9 @@ public class KeysTest { complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"f1.f0.*"}, complexTypeInfo); Assert.assertArrayEquals(new int[] {1,2,3}, complexFpk.computeLogicalKeyPositions()); + + complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"f1.f0"}, complexTypeInfo); + Assert.assertArrayEquals(new int[] {1,2,3}, complexFpk.computeLogicalKeyPositions()); complexFpk = new ExpressionKeys<Tuple3<String, Tuple3<Tuple3<String, String, String>, String, String>, String>>(new String[] {"f2"}, complexTypeInfo); Assert.assertArrayEquals(new int[] {6}, complexFpk.computeLogicalKeyPositions()); @@ -246,6 +245,12 @@ public class KeysTest { ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"p2.*"}, ti); Assert.assertArrayEquals(new int[] {3,4}, ek.computeLogicalKeyPositions()); + ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"p1"}, ti); + Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions()); + + ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"p2"}, ti); + Assert.assertArrayEquals(new int[] {3,4}, ek.computeLogicalKeyPositions()); + ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"i0"}, ti); Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions()); } http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java index 9834a25..3f7e7c5 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/CoGroupWrappingFunctionTest.java @@ -149,23 +149,8 @@ public class CoGroupWrappingFunctionTest { CoGroupOperator coGroupOp = CoGroupOperator.builder(new TestCoGroupFunction(), LongValue.class, 1, 2).build(); DualInputSemanticProperties props = coGroupOp.getSemanticProperties(); - FieldSet fw2 = props.getForwardedField1(2); - FieldSet fw4 = props.getForwardedField2(4); - - assertNotNull(fw2); - assertNotNull(fw4); - assertEquals(1, fw2.size()); - assertEquals(1, fw4.size()); - assertTrue(fw2.contains(2)); - assertTrue(fw4.contains(4)); - } - { - CoGroupOperator coGroupOp = CoGroupOperator.builder(TestCoGroupFunction.class, LongValue.class, 1, 2).build(); - - DualInputSemanticProperties props = coGroupOp.getSemanticProperties(); - FieldSet fw2 = props.getForwardedField1(2); - FieldSet fw4 = props.getForwardedField2(4); - + FieldSet fw2 = props.getForwardingTargetFields(0, 2); + FieldSet fw4 = props.getForwardingTargetFields(1, 4); assertNotNull(fw2); assertNotNull(fw4); assertEquals(1, fw2.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java index 9e262fc..f413b81 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/record/ReduceWrappingFunctionTest.java @@ -159,23 +159,8 @@ public class ReduceWrappingFunctionTest { ReduceOperator reduceOp = ReduceOperator.builder(new TestReduceFunction()).build(); SingleInputSemanticProperties props = reduceOp.getSemanticProperties(); - FieldSet fw2 = props.getForwardedField(2); - FieldSet fw4 = props.getForwardedField(4); - - assertNotNull(fw2); - assertNotNull(fw4); - assertEquals(1, fw2.size()); - assertEquals(1, fw4.size()); - assertTrue(fw2.contains(2)); - assertTrue(fw4.contains(4)); - } - { - ReduceOperator reduceOp = ReduceOperator.builder(TestReduceFunction.class).build(); - - SingleInputSemanticProperties props = reduceOp.getSemanticProperties(); - FieldSet fw2 = props.getForwardedField(2); - FieldSet fw4 = props.getForwardedField(4); - + FieldSet fw2 = props.getForwardingTargetFields(0, 2); + FieldSet fw4 = props.getForwardingTargetFields(0, 4); assertNotNull(fw2); assertNotNull(fw4); assertEquals(1, fw2.size()); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java index 27db31d..1f3f71c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java @@ -82,7 +82,7 @@ public class PojoTypeExtractionTest { public static int ignoreStaticField; public transient int ignoreTransientField; public Date date; // generic type - public Integer someNumber; // BasicType + public Integer someNumberWithÃnicödeNäme; // BasicType public float someFloat; // BasicType public Tuple3<Long, Long, String> word; //Tuple Type with three basic types public Object nothing; // generic type @@ -226,7 +226,7 @@ public class PojoTypeExtractionTest { "complex.collection", "complex.nothing", "complex.someFloat", - "complex.someNumber", + "complex.someNumberWithÃnicödeNäme", "complex.word.f0", "complex.word.f1", "complex.word.f2"}; @@ -242,13 +242,13 @@ public class PojoTypeExtractionTest { 8}; Assert.assertEquals(fields.length, positions.length); for(int i = 0; i < fields.length; i++) { - pojoType.getKey(fields[i], 0, ffd); + pojoType.getFlatFields(fields[i], 0, ffd); Assert.assertEquals("Too many keys returned", 1, ffd.size()); Assert.assertEquals("position of field "+fields[i]+" wrong", positions[i], ffd.get(0).getPosition()); ffd.clear(); } - pojoType.getKey("complex.word.*", 0, ffd); + pojoType.getFlatFields("complex.word.*", 0, ffd); Assert.assertEquals(3, ffd.size()); // check if it returns 5,6,7 for(FlatFieldDescriptor ffdE : ffd) { @@ -268,11 +268,11 @@ public class PojoTypeExtractionTest { ffd.clear(); // scala style full tuple selection for pojos - pojoType.getKey("complex.word._", 0, ffd); + pojoType.getFlatFields("complex.word._", 0, ffd); Assert.assertEquals(3, ffd.size()); ffd.clear(); - pojoType.getKey("complex.*", 0, ffd); + pojoType.getFlatFields("complex.*", 0, ffd); Assert.assertEquals(9, ffd.size()); // check if it returns 0-7 for(FlatFieldDescriptor ffdE : ffd) { @@ -313,7 +313,7 @@ public class PojoTypeExtractionTest { } ffd.clear(); - pojoType.getKey("*", 0, ffd); + pojoType.getFlatFields("*", 0, ffd); Assert.assertEquals(10, ffd.size()); // check if it returns 0-8 for(FlatFieldDescriptor ffdE : ffd) { @@ -344,7 +344,7 @@ public class PojoTypeExtractionTest { dateSeen = true; Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.type); Assert.assertEquals(Date.class, field.type.getTypeClass()); - } else if(name.equals("someNumber")) { + } else if(name.equals("someNumberWithÃnicödeNäme")) { if(intSeen) { Assert.fail("already seen"); } @@ -450,7 +450,7 @@ public class PojoTypeExtractionTest { strArraySeen = true; Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type); Assert.assertEquals(String[].class, field.type.getTypeClass()); - } else if(Arrays.asList("date", "someNumber", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) { + } else if(Arrays.asList("date", "someNumberWithÃnicödeNäme", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) { // ignore these, they are inherited from the ComplexNestedClass } else { http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java index 8a2d675..bc36241 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java @@ -148,7 +148,7 @@ public class TypeExtractorTest { Assert.assertEquals(9, ti.getArity()); Assert.assertTrue(ti instanceof TupleTypeInfo); List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>(); - ((TupleTypeInfo) ti).getKey("f3", 0, ffd); + ((TupleTypeInfo) ti).getFlatFields("f3", 0, ffd); Assert.assertTrue(ffd.size() == 1); Assert.assertEquals(3, ffd.get(0).getPosition() ); @@ -215,16 +215,16 @@ public class TypeExtractorTest { Assert.assertTrue(ti instanceof TupleTypeInfo); List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>(); - ((TupleTypeInfo) ti).getKey("f0.f0", 0, ffd); + ((TupleTypeInfo) ti).getFlatFields("f0.f0", 0, ffd); Assert.assertEquals(0, ffd.get(0).getPosition() ); ffd.clear(); - ((TupleTypeInfo) ti).getKey("f0.f0", 0, ffd); + ((TupleTypeInfo) ti).getFlatFields("f0.f0", 0, ffd); Assert.assertTrue( ffd.get(0).getType() instanceof BasicTypeInfo ); Assert.assertTrue( ffd.get(0).getType().getTypeClass().equals(String.class) ); ffd.clear(); - ((TupleTypeInfo) ti).getKey("f1.f0", 0, ffd); + ((TupleTypeInfo) ti).getFlatFields("f1.f0", 0, ffd); Assert.assertEquals(1, ffd.get(0).getPosition() ); ffd.clear(); @@ -384,19 +384,19 @@ public class TypeExtractorTest { Assert.assertEquals(Tuple2.class, tti.getTypeClass()); List<FlatFieldDescriptor> ffd = new ArrayList<FlatFieldDescriptor>(); - tti.getKey("f0", 0, ffd); + tti.getFlatFields("f0", 0, ffd); Assert.assertEquals(1, ffd.size()); Assert.assertEquals(0, ffd.get(0).getPosition() ); // Long Assert.assertTrue( ffd.get(0).getType().getTypeClass().equals(Long.class) ); ffd.clear(); - tti.getKey("f1.myField1", 0, ffd); + tti.getFlatFields("f1.myField1", 0, ffd); Assert.assertEquals(1, ffd.get(0).getPosition() ); Assert.assertTrue( ffd.get(0).getType().getTypeClass().equals(String.class) ); ffd.clear(); - tti.getKey("f1.myField2", 0, ffd); + tti.getFlatFields("f1.myField2", 0, ffd); Assert.assertEquals(2, ffd.get(0).getPosition() ); Assert.assertTrue( ffd.get(0).getType().getTypeClass().equals(Integer.class) ); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java new file mode 100644 index 0000000..c71625a --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/CompositeTypeTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import static org.junit.Assert.*; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.junit.Test; + +public class CompositeTypeTest { + + private final TupleTypeInfo<?> tupleTypeInfo = new TupleTypeInfo<Tuple4<Integer, Integer, Integer, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); + + private final TupleTypeInfo<Tuple3<Integer, String, Long>> inNestedTuple1 = new TupleTypeInfo<Tuple3<Integer, String, Long>>( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + private final TupleTypeInfo<Tuple2<Double, Double>> inNestedTuple2 = new TupleTypeInfo<Tuple2<Double, Double>>( + BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO); + + private final TupleTypeInfo<?> nestedTypeInfo = new TupleTypeInfo<Tuple4<Integer, Tuple3<Integer, String, Long>, Integer, Tuple2<Double, Double>>>( + BasicTypeInfo.INT_TYPE_INFO, + inNestedTuple1, + BasicTypeInfo.INT_TYPE_INFO, + inNestedTuple2); + + private final TupleTypeInfo<Tuple2<Integer, Tuple2<Integer, Integer>>> inNestedTuple3 = new TupleTypeInfo<Tuple2<Integer, Tuple2<Integer, Integer>>>( + BasicTypeInfo.INT_TYPE_INFO, + new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)); + + private final TupleTypeInfo<?> deepNestedTupleTypeInfo = new TupleTypeInfo<Tuple3<Integer, Tuple2<Integer, Tuple2<Integer, Integer>>, Integer>>( + BasicTypeInfo.INT_TYPE_INFO, + inNestedTuple3, + BasicTypeInfo.INT_TYPE_INFO ); + + private final PojoTypeInfo<?> pojoTypeInfo = ((PojoTypeInfo<?>)TypeExtractor.getForClass(MyPojo.class)); + + private final TupleTypeInfo<?> pojoInTupleTypeInfo = new TupleTypeInfo<Tuple2<Integer, MyPojo>>(BasicTypeInfo.INT_TYPE_INFO, pojoTypeInfo); + + @Test + public void testGetFlatFields() { + + assertEquals(0, tupleTypeInfo.getFlatFields("0").get(0).getPosition()); + assertEquals(1, tupleTypeInfo.getFlatFields("1").get(0).getPosition()); + assertEquals(2, tupleTypeInfo.getFlatFields("2").get(0).getPosition()); + assertEquals(3, tupleTypeInfo.getFlatFields("3").get(0).getPosition()); + assertEquals(0, tupleTypeInfo.getFlatFields("f0").get(0).getPosition()); + assertEquals(1, tupleTypeInfo.getFlatFields("f1").get(0).getPosition()); + assertEquals(2, tupleTypeInfo.getFlatFields("f2").get(0).getPosition()); + assertEquals(3, tupleTypeInfo.getFlatFields("f3").get(0).getPosition()); + + assertEquals(0, nestedTypeInfo.getFlatFields("0").get(0).getPosition()); + assertEquals(1, nestedTypeInfo.getFlatFields("1.0").get(0).getPosition()); + assertEquals(2, nestedTypeInfo.getFlatFields("1.1").get(0).getPosition()); + assertEquals(3, nestedTypeInfo.getFlatFields("1.2").get(0).getPosition()); + assertEquals(4, nestedTypeInfo.getFlatFields("2").get(0).getPosition()); + assertEquals(5, nestedTypeInfo.getFlatFields("3.0").get(0).getPosition()); + assertEquals(6, nestedTypeInfo.getFlatFields("3.1").get(0).getPosition()); + assertEquals(4, nestedTypeInfo.getFlatFields("f2").get(0).getPosition()); + assertEquals(5, nestedTypeInfo.getFlatFields("f3.f0").get(0).getPosition()); + assertEquals(3, nestedTypeInfo.getFlatFields("1").size()); + assertEquals(1, nestedTypeInfo.getFlatFields("1").get(0).getPosition()); + assertEquals(2, nestedTypeInfo.getFlatFields("1").get(1).getPosition()); + assertEquals(3, nestedTypeInfo.getFlatFields("1").get(2).getPosition()); + assertEquals(3, nestedTypeInfo.getFlatFields("1.*").size()); + assertEquals(1, nestedTypeInfo.getFlatFields("1.*").get(0).getPosition()); + assertEquals(2, nestedTypeInfo.getFlatFields("1.*").get(1).getPosition()); + assertEquals(3, nestedTypeInfo.getFlatFields("1.*").get(2).getPosition()); + assertEquals(2, nestedTypeInfo.getFlatFields("3").size()); + assertEquals(5, nestedTypeInfo.getFlatFields("3").get(0).getPosition()); + assertEquals(6, nestedTypeInfo.getFlatFields("3").get(1).getPosition()); + assertEquals(3, nestedTypeInfo.getFlatFields("f1").size()); + assertEquals(1, nestedTypeInfo.getFlatFields("f1").get(0).getPosition()); + assertEquals(2, nestedTypeInfo.getFlatFields("f1").get(1).getPosition()); + assertEquals(3, nestedTypeInfo.getFlatFields("f1").get(2).getPosition()); + assertEquals(2, nestedTypeInfo.getFlatFields("f3").size()); + assertEquals(5, nestedTypeInfo.getFlatFields("f3").get(0).getPosition()); + assertEquals(6, nestedTypeInfo.getFlatFields("f3").get(1).getPosition()); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, + nestedTypeInfo.getFlatFields("0").get(0).getType()); + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, + nestedTypeInfo.getFlatFields("1.1").get(0).getType()); + assertEquals(BasicTypeInfo.LONG_TYPE_INFO, + nestedTypeInfo.getFlatFields("1").get(2).getType()); + assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, + nestedTypeInfo.getFlatFields("3").get(1).getType()); + + assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("1").size()); + assertEquals(1, deepNestedTupleTypeInfo.getFlatFields("1").get(0).getPosition()); + assertEquals(2, deepNestedTupleTypeInfo.getFlatFields("1").get(1).getPosition()); + assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("1").get(2).getPosition()); + assertEquals(5, deepNestedTupleTypeInfo.getFlatFields("*").size()); + assertEquals(0, deepNestedTupleTypeInfo.getFlatFields("*").get(0).getPosition()); + assertEquals(1, deepNestedTupleTypeInfo.getFlatFields("*").get(1).getPosition()); + assertEquals(2, deepNestedTupleTypeInfo.getFlatFields("*").get(2).getPosition()); + assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("*").get(3).getPosition()); + assertEquals(4, deepNestedTupleTypeInfo.getFlatFields("*").get(4).getPosition()); + + assertEquals(0, pojoTypeInfo.getFlatFields("a").get(0).getPosition()); + assertEquals(1, pojoTypeInfo.getFlatFields("b").get(0).getPosition()); + assertEquals(2, pojoTypeInfo.getFlatFields("*").size()); + assertEquals(0, pojoTypeInfo.getFlatFields("*").get(0).getPosition()); + assertEquals(1, pojoTypeInfo.getFlatFields("*").get(1).getPosition()); + + assertEquals(1, pojoInTupleTypeInfo.getFlatFields("f1.a").get(0).getPosition()); + assertEquals(2, pojoInTupleTypeInfo.getFlatFields("1.b").get(0).getPosition()); + assertEquals(2, pojoInTupleTypeInfo.getFlatFields("1").size()); + assertEquals(1, pojoInTupleTypeInfo.getFlatFields("1.*").get(0).getPosition()); + assertEquals(2, pojoInTupleTypeInfo.getFlatFields("1").get(1).getPosition()); + assertEquals(2, pojoInTupleTypeInfo.getFlatFields("f1.*").size()); + assertEquals(1, pojoInTupleTypeInfo.getFlatFields("f1.*").get(0).getPosition()); + assertEquals(2, pojoInTupleTypeInfo.getFlatFields("f1").get(1).getPosition()); + assertEquals(3, pojoInTupleTypeInfo.getFlatFields("*").size()); + assertEquals(0, pojoInTupleTypeInfo.getFlatFields("*").get(0).getPosition()); + assertEquals(1, pojoInTupleTypeInfo.getFlatFields("*").get(1).getPosition()); + assertEquals(2, pojoInTupleTypeInfo.getFlatFields("*").get(2).getPosition()); + + } + + @Test + public void testFieldAtStringRef() { + + assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("0")); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("2")); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("f1")); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("f3")); + + assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("0")); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("1.0")); + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, nestedTypeInfo.getTypeAt("1.1")); + assertEquals(BasicTypeInfo.LONG_TYPE_INFO, nestedTypeInfo.getTypeAt("1.2")); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("2")); + assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("3.0")); + assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("3.1")); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("f2")); + assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("f3.f0")); + assertEquals(inNestedTuple1, nestedTypeInfo.getTypeAt("1")); + assertEquals(inNestedTuple2, nestedTypeInfo.getTypeAt("3")); + assertEquals(inNestedTuple1, nestedTypeInfo.getTypeAt("f1")); + assertEquals(inNestedTuple2, nestedTypeInfo.getTypeAt("f3")); + + assertEquals(inNestedTuple3, deepNestedTupleTypeInfo.getTypeAt("1")); + + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, pojoTypeInfo.getTypeAt("a")); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, pojoTypeInfo.getTypeAt("b")); + + assertEquals(BasicTypeInfo.STRING_TYPE_INFO, pojoInTupleTypeInfo.getTypeAt("f1.a")); + assertEquals(BasicTypeInfo.INT_TYPE_INFO, pojoInTupleTypeInfo.getTypeAt("1.b")); + assertEquals(pojoTypeInfo, pojoInTupleTypeInfo.getTypeAt("1")); + assertEquals(pojoTypeInfo, pojoInTupleTypeInfo.getTypeAt("f1")); + + } + + public static class MyPojo { + public String a; + public int b; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java index 5d0917e..ae47fd3 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java @@ -187,7 +187,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te // test with a simple, string-key first. PojoTypeInfo<TestUserClass> pType = (PojoTypeInfo<TestUserClass>) type; List<FlatFieldDescriptor> result = new ArrayList<FlatFieldDescriptor>(); - pType.getKey("nestedClass.dumm2", 0, result); + pType.getFlatFields("nestedClass.dumm2", 0, result); int[] fields = new int[1]; // see below fields[0] = result.get(0).getPosition(); TypeComparator<TestUserClass> pojoComp = pType.createComparator( fields, new boolean[]{true}, 0); http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 94c5461..c1796ee 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -219,31 +219,31 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { this } - def withConstantSet(constantSets: String*) = { + def withForwardedFields(forwardedFields: String*) = { javaSet match { - case op: SingleInputUdfOperator[_, _, _] => op.withConstantSet(constantSets: _*) + case op: SingleInputUdfOperator[_, _, _] => op.withForwardedFields(forwardedFields: _*) case _ => - throw new UnsupportedOperationException("Cannot specify constant sets on Operator " + + throw new UnsupportedOperationException("Cannot specify forwarded fields for Operator " + javaSet.toString + ".") } this } - def withConstantSetFirst(constantSets: String*) = { + def withForwardedFieldsFirst(forwardedFields: String*) = { javaSet match { - case op: TwoInputUdfOperator[_, _, _, _] => op.withConstantSetFirst(constantSets: _*) + case op: TwoInputUdfOperator[_, _, _, _] => op.withForwardedFieldsFirst(forwardedFields: _*) case _ => - throw new UnsupportedOperationException("Cannot specify constant sets on Operator " + + throw new UnsupportedOperationException("Cannot specify forwarded fields for Operator " + javaSet.toString + ".") } this } - def withConstantSetSecond(constantSets: String*) = { + def withForwardedFieldsSecond(forwardedFields: String*) = { javaSet match { - case op: TwoInputUdfOperator[_, _, _, _] => op.withConstantSetSecond(constantSets: _*) + case op: TwoInputUdfOperator[_, _, _, _] => op.withForwardedFieldsSecond(forwardedFields: _*) case _ => - throw new UnsupportedOperationException("Cannot specify constant sets on Operator " + + throw new UnsupportedOperationException("Cannot specify forwarded fields for Operator " + javaSet.toString + ".") } this
