http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala index ebfe5de..e2c3527 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala @@ -18,10 +18,14 @@ package org.apache.flink.api.scala.typeutils +import java.util.regex.{Pattern, Matcher} + import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.AtomicType +import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor import org.apache.flink.api.java.operators.Keys.ExpressionKeys +import org.apache.flink.api.java.typeutils.PojoTypeInfo.NamedFlatFieldDescriptor import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator} @@ -35,6 +39,18 @@ abstract class CaseClassTypeInfo[T <: Product]( val fieldNames: Seq[String]) extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) { + private val REGEX_INT_FIELD: String = "[0-9]+" + private val REGEX_STR_FIELD: String = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*" + private val REGEX_FIELD: String = REGEX_STR_FIELD + "|" + REGEX_INT_FIELD + private val REGEX_NESTED_FIELDS: String = "(" + REGEX_FIELD + ")(\\.(.+))?" + private val REGEX_NESTED_FIELDS_WILDCARD: String = REGEX_NESTED_FIELDS + "|\\" + + ExpressionKeys.SELECT_ALL_CHAR + "|\\" + ExpressionKeys.SELECT_ALL_CHAR_SCALA + + private val PATTERN_NESTED_FIELDS: Pattern = Pattern.compile(REGEX_NESTED_FIELDS) + private val PATTERN_NESTED_FIELDS_WILDCARD: Pattern = + Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD) + private val PATTERN_INT_FIELD: Pattern = Pattern.compile(REGEX_INT_FIELD) + def getFieldIndices(fields: Array[String]): Array[Int] = { fields map { x => fieldNames.indexOf(x) } } @@ -68,75 +84,126 @@ abstract class CaseClassTypeInfo[T <: Product]( new CaseClassComparator[T](finalLogicalKeyFields, finalComparators, fieldSerializers.toArray) } - override def getKey( + override def getFlatFields( fieldExpression: String, offset: Int, result: java.util.List[FlatFieldDescriptor]): Unit = { + val matcher: Matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression) + if (!matcher.matches) { + throw new InvalidFieldReferenceException("Invalid tuple field reference \"" + + fieldExpression + "\".") + } - if (fieldExpression == ExpressionKeys.SELECT_ALL_CHAR) { - var keyPosition = 0 - for (tpe <- types) { - tpe match { - case a: AtomicType[_] => - result.add(new CompositeType.FlatFieldDescriptor(offset + keyPosition, tpe)) - - case co: CompositeType[_] => - co.getKey(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result) - keyPosition += co.getTotalFields - 1 - - case _ => throw new RuntimeException(s"Unexpected key type: $tpe") - + var field: String = matcher.group(0) + if ((field == ExpressionKeys.SELECT_ALL_CHAR) || + (field == ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + var keyPosition: Int = 0 + for (fType <- fieldTypes) { + fType match { + case ct: CompositeType[_] => + ct.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result) + keyPosition += ct.getTotalFields - 1 + case _ => + result.add(new FlatFieldDescriptor(offset + keyPosition, fType)) } keyPosition += 1 } return + } else { + field = matcher.group(1) } - if (fieldExpression == null || fieldExpression.length <= 0) { - throw new IllegalArgumentException("Field expression must not be empty.") + val intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + if(intFieldMatcher.matches()) { + // convert 0-indexed integer field into 1-indexed name field + field = "_" + (Integer.valueOf(field) + 1) } - fieldExpression.split('.').toList match { - case headField :: Nil => - var fieldId = 0 - for (i <- 0 until fieldNames.length) { - fieldId += types(i).getTotalFields - 1 - - if (fieldNames(i) == headField) { - if (fieldTypes(i).isInstanceOf[CompositeType[_]]) { - throw new IllegalArgumentException( - s"The specified field '$fieldExpression' is refering to a composite type.\n" - + s"Either select all elements in this type with the " + - s"'${ExpressionKeys.SELECT_ALL_CHAR}' operator or specify a field in" + - s" the sub-type") - } - result.add(new CompositeType.FlatFieldDescriptor(offset + fieldId, fieldTypes(i))) - return + var pos = offset + val tail = matcher.group(3) + if (tail == null) { + + for (i <- 0 until fieldNames.length) { + if (field == fieldNames(i)) { + // found field + fieldTypes(i) match { + case ct: CompositeType[_] => + ct.getFlatFields("*", pos, result) + return + case _ => + result.add(new FlatFieldDescriptor(pos, fieldTypes(i))) + return } - - fieldId += 1 + } else { + // skipping over non-matching fields + pos += fieldTypes(i).getTotalFields + } + } + throw new InvalidFieldReferenceException("Unable to find field \"" + field + + "\" in type " + this + ".") + } else { + var pos = offset + for (i <- 0 until fieldNames.length) { + if (field == fieldNames(i)) { + // found field + fieldTypes(i) match { + case ct: CompositeType[_] => + ct.getFlatFields(tail, pos, result) + return + case _ => + throw new InvalidFieldReferenceException("Nested field expression \"" + tail + + "\" not possible on atomic type " + fieldTypes(i) + ".") + } + } else { + // skipping over non-matching fields + pos += fieldTypes(i).getTotalFields } - case firstField :: rest => - var fieldId = 0 - for (i <- 0 until fieldNames.length) { + } + throw new InvalidFieldReferenceException("Unable to find field \"" + field + + "\" in type " + this + ".") + } + } - if (fieldNames(i) == firstField) { - fieldTypes(i) match { - case co: CompositeType[_] => - co.getKey(rest.mkString("."), offset + fieldId, result) - return + override def getTypeAt[X](fieldExpression: String) : TypeInformation[X] = { - case _ => - throw new RuntimeException(s"Field ${fieldTypes(i)} is not a composite type.") + val matcher: Matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression) + if (!matcher.matches) { + if (fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) || + fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { + throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here.") + } + else { + throw new InvalidFieldReferenceException("Invalid format of case class field expression \"" + + fieldExpression + "\".") + } + } - } - } + var field = matcher.group(1) + val tail = matcher.group(3) - fieldId += types(i).getTotalFields - } + val intFieldMatcher = PATTERN_INT_FIELD.matcher(field); + if(intFieldMatcher.matches()) { + // convert 0-indexed integer field into 1-indexed name field + field = "_" + (Integer.valueOf(field) + 1) } - throw new RuntimeException(s"Unable to find field $fieldExpression in type $this.") + for (i <- 0 until fieldNames.length) { + if (fieldNames(i) == field) { + if (tail == null) { + return getTypeAt(i) + } else { + fieldTypes(i) match { + case co: CompositeType[_] => + return co.getTypeAt(tail) + case _ => + throw new InvalidFieldReferenceException("Nested field expression \"" + tail + + "\" not possible on atomic type " + fieldTypes(i) + ".") + } + } + } + } + throw new InvalidFieldReferenceException("Unable to find field \"" + field + + "\" in type " + this + ".") } override def toString = clazz.getSimpleName + "(" + fieldNames.zip(types).map {
http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java index 49efb12..cd6345b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java @@ -24,8 +24,8 @@ import java.util.List; import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.test.testdata.ConnectedComponentsData; import org.apache.flink.test.util.JavaProgramTestBase; @@ -109,8 +109,8 @@ public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase } } - @ConstantFieldsFirst("0") - @ConstantFieldsSecond("0") + @ForwardedFieldsFirst("0") + @ForwardedFieldsSecond("0") public static final class MinIdAndUpdate extends RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala index 7eaf856..eecc347 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala @@ -23,9 +23,9 @@ import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.operators.{GenericDataSinkBase, SingleInputSemanticProperties} import org.apache.flink.api.common.operators.base.{JoinOperatorBase, MapOperatorBase} import org.apache.flink.api.common.operators.util.FieldSet -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst -import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond import org.junit.Test import org.apache.flink.api.scala._ @@ -46,7 +46,7 @@ class SemanticPropertiesTranslationTest { val env = ExecutionEnvironment.getExecutionEnvironment val input = env.fromElements((3L, "test", 42)) - input.map(new WildcardConstantMapper[(Long, String, Int)]).print() + input.map(new WildcardForwardMapper[(Long, String, Int)]).print() val plan = env.createProgramPlan() @@ -55,9 +55,9 @@ class SemanticPropertiesTranslationTest { val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]] val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties - val fw1: FieldSet = semantics.getForwardedField(0) - val fw2: FieldSet = semantics.getForwardedField(1) - val fw3: FieldSet = semantics.getForwardedField(2) + val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0) + val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1) + val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2) assertNotNull(fw1) assertNotNull(fw2) @@ -78,12 +78,12 @@ class SemanticPropertiesTranslationTest { * A mapper that preserves fields 0, 1, 2 of a tuple data set. */ @Test - def translateUnaryFunctionAnnotationTuples(): Unit = { + def translateUnaryFunctionAnnotationTuples1(): Unit = { try { val env = ExecutionEnvironment.getExecutionEnvironment val input = env.fromElements((3L, "test", 42)) - input.map(new IndividualConstantMapper[Long, String, Int]).print() + input.map(new IndividualForwardMapper[Long, String, Int]).print() val plan = env.createProgramPlan() @@ -92,9 +92,9 @@ class SemanticPropertiesTranslationTest { val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]] val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties - val fw1: FieldSet = semantics.getForwardedField(0) - val fw2: FieldSet = semantics.getForwardedField(1) - val fw3: FieldSet = semantics.getForwardedField(2) + val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0) + val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1) + val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2) assertNotNull(fw1) assertNotNull(fw2) @@ -112,10 +112,47 @@ class SemanticPropertiesTranslationTest { } /** + * A mapper that preserves field 1 of a tuple data set. + */ + @Test + def translateUnaryFunctionAnnotationTuples2(): Unit = { + try { + val env = ExecutionEnvironment.getExecutionEnvironment + + val input = env.fromElements((3L, "test", 42)) + input.map(new FieldTwoForwardMapper[Long, String, Int]).print() + + val plan = env.createProgramPlan() + + val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next + + val mapper: MapOperatorBase[_, _, _] = sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]] + + val semantics: SingleInputSemanticProperties = mapper.getSemanticProperties + val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0) + val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1) + val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2) + + assertNotNull(fw1) + assertNotNull(fw2) + assertNotNull(fw3) + assertTrue(fw1.size == 0) + assertTrue(fw3.size == 0) + assertTrue(fw2.contains(1)) + } catch { + case e: Exception => { + System.err.println(e.getMessage) + e.printStackTrace() + fail("Exception in test: " + e.getMessage) + } + } + } + + /** * A join that preserves tuple fields from both sides. */ @Test - def translateBinaryFunctionAnnotationTuples(): Unit = { + def translateBinaryFunctionAnnotationTuples1(): Unit = { try { val env = ExecutionEnvironment.getExecutionEnvironment @@ -132,17 +169,63 @@ class SemanticPropertiesTranslationTest { sink.getInput.asInstanceOf[JoinOperatorBase[_, _, _, _]] val semantics = join.getSemanticProperties - val fw11: FieldSet = semantics.getForwardedField1(0) - val fw12: FieldSet = semantics.getForwardedField1(1) - val fw21: FieldSet = semantics.getForwardedField2(0) - val fw22: FieldSet = semantics.getForwardedField2(1) + val fw11: FieldSet = semantics.getForwardingTargetFields(0, 0) + val fw12: FieldSet = semantics.getForwardingTargetFields(0, 1) + val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0) + val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1) - assertNull(fw11) - assertNull(fw21) + assertNotNull(fw11) + assertNotNull(fw21) assertNotNull(fw12) assertNotNull(fw22) + assertEquals(0, fw11.size) + assertEquals(0, fw22.size) assertTrue(fw12.contains(0)) - assertTrue(fw22.contains(1)) + assertTrue(fw21.contains(1)) + } + catch { + case e: Exception => { + System.err.println(e.getMessage) + e.printStackTrace() + fail("Exception in test: " + e.getMessage) + } + } + } + + /** + * A join that preserves tuple fields from both sides. + */ + @Test + def translateBinaryFunctionAnnotationTuples2(): Unit = { + try { + val env = ExecutionEnvironment.getExecutionEnvironment + + val input1 = env.fromElements((3L, "test")) + val input2 = env.fromElements((3L, 42)) + + input1.join(input2).where(0).equalTo(0)( + new ForwardingBasicJoin[(Long, String), (Long, Int)]).print() + + val plan = env.createProgramPlan() + val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next + + val join: JoinOperatorBase[_, _, _, _] = + sink.getInput.asInstanceOf[JoinOperatorBase[_, _, _, _]] + + val semantics = join.getSemanticProperties + val fw11: FieldSet = semantics.getForwardingTargetFields(0, 0) + val fw12: FieldSet = semantics.getForwardingTargetFields(0, 1) + val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0) + val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1) + + assertNotNull(fw11) + assertNotNull(fw12) + assertNotNull(fw21) + assertNotNull(fw22) + assertTrue(fw11.contains(0)) + assertTrue(fw12.contains(1)) + assertTrue(fw21.contains(2)) + assertTrue(fw22.contains(3)) } catch { case e: Exception => { @@ -155,40 +238,39 @@ class SemanticPropertiesTranslationTest { } -@ConstantFields(Array("*")) -class WildcardConstantMapper[T] extends RichMapFunction[T, T] { +@ForwardedFields(Array("*")) +class WildcardForwardMapper[T] extends RichMapFunction[T, T] { def map(value: T): T = { value } } -@ConstantFields(Array("0->0;1->1;2->2")) -class IndividualConstantMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, Z)] { +@ForwardedFields(Array("0;1;2")) +class IndividualForwardMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, Z)] { def map(value: (X, Y, Z)): (X, Y, Z) = { value } } -@ConstantFields(Array("0")) -class ZeroConstantMapper[T] extends RichMapFunction[T, T] { - def map(value: T): T = { +@ForwardedFields(Array("_2")) +class FieldTwoForwardMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, Z)] { + def map(value: (X, Y ,Z)): (X, Y, Z) = { value } } -@ConstantFieldsFirst(Array("1 -> 0")) -@ConstantFieldsSecond(Array("1 -> 1")) -class ForwardingTupleJoin[A, B, C, D] extends RichJoinFunction[(A, B), (C, D), (B, D)] { - def join(first: (A, B), second: (C, D)): (B, D) = { - (first._2, second._2) +@ForwardedFieldsFirst(Array("_2 -> _1")) +@ForwardedFieldsSecond(Array("_1 -> _2")) +class ForwardingTupleJoin[A, B, C, D] extends RichJoinFunction[(A, B), (C, D), (B, C)] { + def join(first: (A, B), second: (C, D)): (B, C) = { + (first._2, second._1) } } -@ConstantFieldsFirst(Array("0 -> 0")) -@ConstantFieldsSecond(Array("0 -> 1")) +@ForwardedFieldsFirst(Array("* -> 0.*")) +@ForwardedFieldsSecond(Array("* -> 1.*")) class ForwardingBasicJoin[A, B] extends RichJoinFunction[A, B, (A, B)] { def join(first: A, second: B): (A, B) = { (first, second) } } - http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala index 0c25995..640890f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala @@ -39,6 +39,8 @@ class MyWritable extends Writable { case class CustomCaseClass(a: String, b: Int) +case class UmlautCaseClass(ä: String, Ã: Int) + class CustomType(var myField1: String, var myField2: Int) { def this() { this(null, 0) @@ -278,5 +280,142 @@ class TypeInformationGenTest { Assert.assertEquals(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, tti.getTypeAt(7)) Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, tti.getTypeAt(8)) } + + @Test + def testGetFlatFields(): Unit = { + + val tupleTypeInfo = createTypeInformation[(Int, Int, Int, Int)]. + asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int)]] + Assert.assertEquals(0, tupleTypeInfo.getFlatFields("0").get(0).getPosition) + Assert.assertEquals(1, tupleTypeInfo.getFlatFields("1").get(0).getPosition) + Assert.assertEquals(2, tupleTypeInfo.getFlatFields("2").get(0).getPosition) + Assert.assertEquals(3, tupleTypeInfo.getFlatFields("3").get(0).getPosition) + Assert.assertEquals(0, tupleTypeInfo.getFlatFields("_1").get(0).getPosition) + Assert.assertEquals(1, tupleTypeInfo.getFlatFields("_2").get(0).getPosition) + Assert.assertEquals(2, tupleTypeInfo.getFlatFields("_3").get(0).getPosition) + Assert.assertEquals(3, tupleTypeInfo.getFlatFields("_4").get(0).getPosition) + + val nestedTypeInfo = createTypeInformation[(Int, (Int, String, Long), Int, (Double, Double))]. + asInstanceOf[CaseClassTypeInfo[(Int, (Int, String, Long), Int, (Double, Double))]] + Assert.assertEquals(0, nestedTypeInfo.getFlatFields("0").get(0).getPosition) + Assert.assertEquals(1, nestedTypeInfo.getFlatFields("1.0").get(0).getPosition) + Assert.assertEquals(2, nestedTypeInfo.getFlatFields("1.1").get(0).getPosition) + Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.2").get(0).getPosition) + Assert.assertEquals(4, nestedTypeInfo.getFlatFields("2").get(0).getPosition) + Assert.assertEquals(5, nestedTypeInfo.getFlatFields("3.0").get(0).getPosition) + Assert.assertEquals(6, nestedTypeInfo.getFlatFields("3.1").get(0).getPosition) + Assert.assertEquals(4, nestedTypeInfo.getFlatFields("_3").get(0).getPosition) + Assert.assertEquals(5, nestedTypeInfo.getFlatFields("_4._1").get(0).getPosition) + Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1").size) + Assert.assertEquals(1, nestedTypeInfo.getFlatFields("1").get(0).getPosition) + Assert.assertEquals(2, nestedTypeInfo.getFlatFields("1").get(1).getPosition) + Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1").get(2).getPosition) + Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.*").size) + Assert.assertEquals(1, nestedTypeInfo.getFlatFields("1.*").get(0).getPosition) + Assert.assertEquals(2, nestedTypeInfo.getFlatFields("1.*").get(1).getPosition) + Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.*").get(2).getPosition) + Assert.assertEquals(2, nestedTypeInfo.getFlatFields("3").size) + Assert.assertEquals(5, nestedTypeInfo.getFlatFields("3").get(0).getPosition) + Assert.assertEquals(6, nestedTypeInfo.getFlatFields("3").get(1).getPosition) + Assert.assertEquals(3, nestedTypeInfo.getFlatFields("_2").size) + Assert.assertEquals(1, nestedTypeInfo.getFlatFields("_2").get(0).getPosition) + Assert.assertEquals(2, nestedTypeInfo.getFlatFields("_2").get(1).getPosition) + Assert.assertEquals(3, nestedTypeInfo.getFlatFields("_2").get(2).getPosition) + Assert.assertEquals(2, nestedTypeInfo.getFlatFields("_4").size) + Assert.assertEquals(5, nestedTypeInfo.getFlatFields("_4").get(0).getPosition) + Assert.assertEquals(6, nestedTypeInfo.getFlatFields("_4").get(1).getPosition) + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, + nestedTypeInfo.getFlatFields("0").get(0).getType) + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, + nestedTypeInfo.getFlatFields("1.1").get(0).getType) + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, + nestedTypeInfo.getFlatFields("1").get(2).getType) + Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, + nestedTypeInfo.getFlatFields("3").get(1).getType) + + val deepNestedTupleTypeInfo = createTypeInformation[(Int, (Int, (Int, Int)), Int)]. + asInstanceOf[CaseClassTypeInfo[(Int, (Int, (Int, Int)), Int)]] + Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("1").size) + Assert.assertEquals(1, deepNestedTupleTypeInfo.getFlatFields("1").get(0).getPosition) + Assert.assertEquals(2, deepNestedTupleTypeInfo.getFlatFields("1").get(1).getPosition) + Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("1").get(2).getPosition) + Assert.assertEquals(5, deepNestedTupleTypeInfo.getFlatFields("*").size) + Assert.assertEquals(0, deepNestedTupleTypeInfo.getFlatFields("*").get(0).getPosition) + Assert.assertEquals(1, deepNestedTupleTypeInfo.getFlatFields("*").get(1).getPosition) + Assert.assertEquals(2, deepNestedTupleTypeInfo.getFlatFields("*").get(2).getPosition) + Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("*").get(3).getPosition) + Assert.assertEquals(4, deepNestedTupleTypeInfo.getFlatFields("*").get(4).getPosition) + + val caseClassTypeInfo = createTypeInformation[CustomCaseClass]. + asInstanceOf[CaseClassTypeInfo[CustomCaseClass]] + Assert.assertEquals(0, caseClassTypeInfo.getFlatFields("a").get(0).getPosition) + Assert.assertEquals(1, caseClassTypeInfo.getFlatFields("b").get(0).getPosition) + Assert.assertEquals(2, caseClassTypeInfo.getFlatFields("*").size) + Assert.assertEquals(0, caseClassTypeInfo.getFlatFields("*").get(0).getPosition) + Assert.assertEquals(1, caseClassTypeInfo.getFlatFields("*").get(1).getPosition) + + val caseClassInTupleTypeInfo = createTypeInformation[(Int, UmlautCaseClass)]. + asInstanceOf[CaseClassTypeInfo[(Int, UmlautCaseClass)]] + Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("_2.ä").get(0).getPosition) + Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1.Ã").get(0).getPosition) + Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1").size) + Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("1.*").get(0).getPosition) + Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1").get(1).getPosition) + Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("_2.*").size) + Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("_2.*").get(0).getPosition) + Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("_2").get(1).getPosition) + Assert.assertEquals(3, caseClassInTupleTypeInfo.getFlatFields("*").size) + Assert.assertEquals(0, caseClassInTupleTypeInfo.getFlatFields("*").get(0).getPosition) + Assert.assertEquals(1, caseClassInTupleTypeInfo.getFlatFields("*").get(1).getPosition) + Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("*").get(2).getPosition) + + } + + @Test + def testFieldAtStringRef(): Unit = { + + val tupleTypeInfo = createTypeInformation[(Int, Int, Int, Int)]. + asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int)]] + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("0")) + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("2")) + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("_2")) + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tupleTypeInfo.getTypeAt("_4")) + + val nestedTypeInfo = createTypeInformation[(Int, (Int, String, Long), Int, (Double, Double))]. + asInstanceOf[CaseClassTypeInfo[(Int, (Int, String, Long), Int, (Double, Double))]] + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("0")) + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("1.0")) + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, nestedTypeInfo.getTypeAt("1.1")) + Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, nestedTypeInfo.getTypeAt("1.2")) + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("2")) + Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("3.0")) + Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("3.1")) + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, nestedTypeInfo.getTypeAt("_3")) + Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, nestedTypeInfo.getTypeAt("_4._1")) + Assert.assertEquals(createTypeInformation[(Int, String, Long)], nestedTypeInfo.getTypeAt("1")) + Assert.assertEquals(createTypeInformation[(Double, Double)], nestedTypeInfo.getTypeAt("3")) + Assert.assertEquals(createTypeInformation[(Int, String, Long)], nestedTypeInfo.getTypeAt("_2")) + Assert.assertEquals(createTypeInformation[(Double, Double)], nestedTypeInfo.getTypeAt("_4")) + + val deepNestedTupleTypeInfo = createTypeInformation[(Int, (Int, (Int, Int)), Int)]. + asInstanceOf[CaseClassTypeInfo[(Int, (Int, (Int, Int)), Int)]] + Assert.assertEquals(createTypeInformation[(Int, (Int, Int))], + deepNestedTupleTypeInfo.getTypeAt("1")) + + val umlautCaseClassTypeInfo = createTypeInformation[UmlautCaseClass]. + asInstanceOf[CaseClassTypeInfo[UmlautCaseClass]] + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, umlautCaseClassTypeInfo.getTypeAt("ä")) + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, umlautCaseClassTypeInfo.getTypeAt("Ã")) + + val caseClassTypeInfo = createTypeInformation[CustomCaseClass]. + asInstanceOf[CaseClassTypeInfo[CustomCaseClass]] + val caseClassInTupleTypeInfo = createTypeInformation[(Int, CustomCaseClass)]. + asInstanceOf[CaseClassTypeInfo[(Int, CustomCaseClass)]] + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, caseClassInTupleTypeInfo.getTypeAt("_2.a")) + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, caseClassInTupleTypeInfo.getTypeAt("1.b")) + Assert.assertEquals(caseClassTypeInfo, caseClassInTupleTypeInfo.getTypeAt("1")) + Assert.assertEquals(caseClassTypeInfo, caseClassInTupleTypeInfo.getTypeAt("_2")) + + } }
