http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index ebfe5de..e2c3527 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -18,10 +18,14 @@
 
 package org.apache.flink.api.scala.typeutils
 
+import java.util.regex.{Pattern, Matcher}
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeinfo.AtomicType
+import 
org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys
+import 
org.apache.flink.api.java.typeutils.PojoTypeInfo.NamedFlatFieldDescriptor
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator}
 
@@ -35,6 +39,18 @@ abstract class CaseClassTypeInfo[T <: Product](
     val fieldNames: Seq[String])
   extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) {
 
+  private val REGEX_INT_FIELD: String = "[0-9]+"
+  private val REGEX_STR_FIELD: String = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"
+  private val REGEX_FIELD: String = REGEX_STR_FIELD + "|" + REGEX_INT_FIELD
+  private val REGEX_NESTED_FIELDS: String = "(" + REGEX_FIELD + ")(\\.(.+))?"
+  private val REGEX_NESTED_FIELDS_WILDCARD: String = REGEX_NESTED_FIELDS + 
"|\\" +
+    ExpressionKeys.SELECT_ALL_CHAR + "|\\" + 
ExpressionKeys.SELECT_ALL_CHAR_SCALA
+
+  private val PATTERN_NESTED_FIELDS: Pattern = 
Pattern.compile(REGEX_NESTED_FIELDS)
+  private val PATTERN_NESTED_FIELDS_WILDCARD: Pattern =
+    Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD)
+  private val PATTERN_INT_FIELD: Pattern = Pattern.compile(REGEX_INT_FIELD)
+
   def getFieldIndices(fields: Array[String]): Array[Int] = {
     fields map { x => fieldNames.indexOf(x) }
   }
@@ -68,75 +84,126 @@ abstract class CaseClassTypeInfo[T <: Product](
     new CaseClassComparator[T](finalLogicalKeyFields, finalComparators, 
fieldSerializers.toArray)
   }
 
-  override def getKey(
+  override def getFlatFields(
       fieldExpression: String,
       offset: Int,
       result: java.util.List[FlatFieldDescriptor]): Unit = {
+    val matcher: Matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression)
+    if (!matcher.matches) {
+      throw new InvalidFieldReferenceException("Invalid tuple field reference 
\"" +
+        fieldExpression + "\".")
+    }
 
-    if (fieldExpression == ExpressionKeys.SELECT_ALL_CHAR) {
-      var keyPosition = 0
-      for (tpe <- types) {
-        tpe match {
-          case a: AtomicType[_] =>
-            result.add(new CompositeType.FlatFieldDescriptor(offset + 
keyPosition, tpe))
-
-          case co: CompositeType[_] =>
-            co.getKey(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, 
result)
-            keyPosition += co.getTotalFields - 1
-
-          case _ => throw new RuntimeException(s"Unexpected key type: $tpe")
-
+    var field: String = matcher.group(0)
+    if ((field == ExpressionKeys.SELECT_ALL_CHAR) ||
+      (field == ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+      var keyPosition: Int = 0
+      for (fType <- fieldTypes) {
+        fType match {
+          case ct: CompositeType[_] =>
+            ct.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + 
keyPosition, result)
+            keyPosition += ct.getTotalFields - 1
+          case _ =>
+            result.add(new FlatFieldDescriptor(offset + keyPosition, fType))
         }
         keyPosition += 1
       }
       return
+    } else {
+      field = matcher.group(1)
     }
 
-    if (fieldExpression == null || fieldExpression.length <= 0) {
-      throw new IllegalArgumentException("Field expression must not be empty.")
+    val intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
+    if(intFieldMatcher.matches()) {
+      // convert 0-indexed integer field into 1-indexed name field
+      field = "_" + (Integer.valueOf(field) + 1)
     }
 
-    fieldExpression.split('.').toList match {
-      case headField :: Nil =>
-        var fieldId = 0
-        for (i <- 0 until fieldNames.length) {
-          fieldId += types(i).getTotalFields - 1
-
-          if (fieldNames(i) == headField) {
-            if (fieldTypes(i).isInstanceOf[CompositeType[_]]) {
-              throw new IllegalArgumentException(
-                s"The specified field '$fieldExpression' is refering to a 
composite type.\n"
-                + s"Either select all elements in this type with the " +
-                  s"'${ExpressionKeys.SELECT_ALL_CHAR}' operator or specify a 
field in" +
-                  s" the sub-type")
-            }
-            result.add(new CompositeType.FlatFieldDescriptor(offset + fieldId, 
fieldTypes(i)))
-            return
+    var pos = offset
+    val tail = matcher.group(3)
+    if (tail == null) {
+
+      for (i <- 0 until fieldNames.length) {
+        if (field == fieldNames(i)) {
+          // found field
+          fieldTypes(i) match {
+            case ct: CompositeType[_] =>
+              ct.getFlatFields("*", pos, result)
+              return
+            case _ =>
+              result.add(new FlatFieldDescriptor(pos, fieldTypes(i)))
+              return
           }
-
-          fieldId += 1
+        } else {
+          // skipping over non-matching fields
+          pos += fieldTypes(i).getTotalFields
+        }
+      }
+      throw new InvalidFieldReferenceException("Unable to find field \"" + 
field +
+        "\" in type " + this + ".")
+    } else {
+      var pos = offset
+      for (i <- 0 until fieldNames.length) {
+        if (field == fieldNames(i)) {
+          // found field
+          fieldTypes(i) match {
+            case ct: CompositeType[_] =>
+              ct.getFlatFields(tail, pos, result)
+              return
+            case _ =>
+              throw new InvalidFieldReferenceException("Nested field 
expression \"" + tail +
+                "\" not possible on atomic type " + fieldTypes(i) + ".")
+          }
+        } else {
+          // skipping over non-matching fields
+          pos += fieldTypes(i).getTotalFields
         }
-      case firstField :: rest =>
-        var fieldId = 0
-        for (i <- 0 until fieldNames.length) {
+      }
+      throw new InvalidFieldReferenceException("Unable to find field \"" + 
field +
+        "\" in type " + this + ".")
+    }
+  }
 
-          if (fieldNames(i) == firstField) {
-            fieldTypes(i) match {
-              case co: CompositeType[_] =>
-                co.getKey(rest.mkString("."), offset + fieldId, result)
-                return
+  override def getTypeAt[X](fieldExpression: String) : TypeInformation[X] = {
 
-              case _ =>
-                throw new RuntimeException(s"Field ${fieldTypes(i)} is not a 
composite type.")
+    val matcher: Matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression)
+    if (!matcher.matches) {
+      if (fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR) ||
+        fieldExpression.startsWith(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+        throw new InvalidFieldReferenceException("Wildcard expressions are not 
allowed here.")
+      }
+      else {
+        throw new InvalidFieldReferenceException("Invalid format of case class 
field expression \""
+          + fieldExpression + "\".")
+      }
+    }
 
-            }
-          }
+    var field = matcher.group(1)
+    val tail = matcher.group(3)
 
-          fieldId += types(i).getTotalFields
-        }
+    val intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
+    if(intFieldMatcher.matches()) {
+      // convert 0-indexed integer field into 1-indexed name field
+      field = "_" + (Integer.valueOf(field) + 1)
     }
 
-    throw new RuntimeException(s"Unable to find field $fieldExpression in type 
$this.")
+    for (i <- 0 until fieldNames.length) {
+      if (fieldNames(i) == field) {
+        if (tail == null) {
+          return getTypeAt(i)
+        } else {
+          fieldTypes(i) match {
+            case co: CompositeType[_] =>
+              return co.getTypeAt(tail)
+            case _ =>
+              throw new InvalidFieldReferenceException("Nested field 
expression \"" + tail +
+                "\" not possible on atomic type " + fieldTypes(i) + ".")
+          }
+        }
+      }
+    }
+    throw new InvalidFieldReferenceException("Unable to find field \"" + field 
+
+      "\" in type " + this + ".")
   }
 
   override def toString = clazz.getSimpleName + "(" + 
fieldNames.zip(types).map {

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
index 49efb12..cd6345b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -24,8 +24,8 @@ import java.util.List;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -109,8 +109,8 @@ public class CoGroupConnectedComponentsSecondITCase extends 
JavaProgramTestBase
                }
        }
 
-       @ConstantFieldsFirst("0")
-       @ConstantFieldsSecond("0")
+       @ForwardedFieldsFirst("0")
+       @ForwardedFieldsSecond("0")
        public static final class MinIdAndUpdate extends 
RichCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> 
{
                
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
index 7eaf856..eecc347 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/SemanticPropertiesTranslationTest.scala
@@ -23,9 +23,9 @@ import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.operators.{GenericDataSinkBase, 
SingleInputSemanticProperties}
 import org.apache.flink.api.common.operators.base.{JoinOperatorBase, 
MapOperatorBase}
 import org.apache.flink.api.common.operators.util.FieldSet
-import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsSecond
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond
 import org.junit.Test
 
 import org.apache.flink.api.scala._
@@ -46,7 +46,7 @@ class SemanticPropertiesTranslationTest {
       val env = ExecutionEnvironment.getExecutionEnvironment
 
       val input = env.fromElements((3L, "test", 42))
-      input.map(new WildcardConstantMapper[(Long, String, Int)]).print()
+      input.map(new WildcardForwardMapper[(Long, String, Int)]).print()
 
       val plan = env.createProgramPlan()
 
@@ -55,9 +55,9 @@ class SemanticPropertiesTranslationTest {
       val mapper: MapOperatorBase[_, _, _] = 
sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
 
       val semantics: SingleInputSemanticProperties = 
mapper.getSemanticProperties
-      val fw1: FieldSet = semantics.getForwardedField(0)
-      val fw2: FieldSet = semantics.getForwardedField(1)
-      val fw3: FieldSet = semantics.getForwardedField(2)
+      val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
+      val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
+      val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
 
       assertNotNull(fw1)
       assertNotNull(fw2)
@@ -78,12 +78,12 @@ class SemanticPropertiesTranslationTest {
    * A mapper that preserves fields 0, 1, 2 of a tuple data set.
    */
   @Test
-  def translateUnaryFunctionAnnotationTuples(): Unit = {
+  def translateUnaryFunctionAnnotationTuples1(): Unit = {
     try {
       val env = ExecutionEnvironment.getExecutionEnvironment
 
       val input = env.fromElements((3L, "test", 42))
-      input.map(new IndividualConstantMapper[Long, String, Int]).print()
+      input.map(new IndividualForwardMapper[Long, String, Int]).print()
 
       val plan = env.createProgramPlan()
 
@@ -92,9 +92,9 @@ class SemanticPropertiesTranslationTest {
       val mapper: MapOperatorBase[_, _, _] = 
sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
 
       val semantics: SingleInputSemanticProperties = 
mapper.getSemanticProperties
-      val fw1: FieldSet = semantics.getForwardedField(0)
-      val fw2: FieldSet = semantics.getForwardedField(1)
-      val fw3: FieldSet = semantics.getForwardedField(2)
+      val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
+      val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
+      val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
 
       assertNotNull(fw1)
       assertNotNull(fw2)
@@ -112,10 +112,47 @@ class SemanticPropertiesTranslationTest {
   }
 
   /**
+   * A mapper that preserves field 1 of a tuple data set.
+   */
+  @Test
+  def translateUnaryFunctionAnnotationTuples2(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val input = env.fromElements((3L, "test", 42))
+      input.map(new FieldTwoForwardMapper[Long, String, Int]).print()
+
+      val plan = env.createProgramPlan()
+
+      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
+
+      val mapper: MapOperatorBase[_, _, _] = 
sink.getInput.asInstanceOf[MapOperatorBase[_, _, _]]
+
+      val semantics: SingleInputSemanticProperties = 
mapper.getSemanticProperties
+      val fw1: FieldSet = semantics.getForwardingTargetFields(0, 0)
+      val fw2: FieldSet = semantics.getForwardingTargetFields(0, 1)
+      val fw3: FieldSet = semantics.getForwardingTargetFields(0, 2)
+
+      assertNotNull(fw1)
+      assertNotNull(fw2)
+      assertNotNull(fw3)
+      assertTrue(fw1.size == 0)
+      assertTrue(fw3.size == 0)
+      assertTrue(fw2.contains(1))
+    } catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Exception in test: " + e.getMessage)
+      }
+    }
+  }
+
+  /**
    * A join that preserves tuple fields from both sides.
    */
   @Test
-  def translateBinaryFunctionAnnotationTuples(): Unit = {
+  def translateBinaryFunctionAnnotationTuples1(): Unit = {
     try {
       val env = ExecutionEnvironment.getExecutionEnvironment
 
@@ -132,17 +169,63 @@ class SemanticPropertiesTranslationTest {
         sink.getInput.asInstanceOf[JoinOperatorBase[_, _, _, _]]
 
       val semantics = join.getSemanticProperties
-      val fw11: FieldSet = semantics.getForwardedField1(0)
-      val fw12: FieldSet = semantics.getForwardedField1(1)
-      val fw21: FieldSet = semantics.getForwardedField2(0)
-      val fw22: FieldSet = semantics.getForwardedField2(1)
+      val fw11: FieldSet = semantics.getForwardingTargetFields(0, 0)
+      val fw12: FieldSet = semantics.getForwardingTargetFields(0, 1)
+      val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0)
+      val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1)
 
-      assertNull(fw11)
-      assertNull(fw21)
+      assertNotNull(fw11)
+      assertNotNull(fw21)
       assertNotNull(fw12)
       assertNotNull(fw22)
+      assertEquals(0, fw11.size)
+      assertEquals(0, fw22.size)
       assertTrue(fw12.contains(0))
-      assertTrue(fw22.contains(1))
+      assertTrue(fw21.contains(1))
+    }
+    catch {
+      case e: Exception => {
+        System.err.println(e.getMessage)
+        e.printStackTrace()
+        fail("Exception in test: " + e.getMessage)
+      }
+    }
+  }
+
+  /**
+   * A join that preserves tuple fields from both sides.
+   */
+  @Test
+  def translateBinaryFunctionAnnotationTuples2(): Unit = {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+
+      val input1 = env.fromElements((3L, "test"))
+      val input2 = env.fromElements((3L, 42))
+
+      input1.join(input2).where(0).equalTo(0)(
+        new ForwardingBasicJoin[(Long, String), (Long, Int)]).print()
+
+      val plan = env.createProgramPlan()
+      val sink: GenericDataSinkBase[_] = plan.getDataSinks.iterator.next
+
+      val join: JoinOperatorBase[_, _, _, _] =
+        sink.getInput.asInstanceOf[JoinOperatorBase[_, _, _, _]]
+
+      val semantics = join.getSemanticProperties
+      val fw11: FieldSet = semantics.getForwardingTargetFields(0, 0)
+      val fw12: FieldSet = semantics.getForwardingTargetFields(0, 1)
+      val fw21: FieldSet = semantics.getForwardingTargetFields(1, 0)
+      val fw22: FieldSet = semantics.getForwardingTargetFields(1, 1)
+
+      assertNotNull(fw11)
+      assertNotNull(fw12)
+      assertNotNull(fw21)
+      assertNotNull(fw22)
+      assertTrue(fw11.contains(0))
+      assertTrue(fw12.contains(1))
+      assertTrue(fw21.contains(2))
+      assertTrue(fw22.contains(3))
     }
     catch {
       case e: Exception => {
@@ -155,40 +238,39 @@ class SemanticPropertiesTranslationTest {
 }
 
 
-@ConstantFields(Array("*"))
-class WildcardConstantMapper[T] extends RichMapFunction[T, T] {
+@ForwardedFields(Array("*"))
+class WildcardForwardMapper[T] extends RichMapFunction[T, T] {
   def map(value: T): T = {
     value
   }
 }
 
-@ConstantFields(Array("0->0;1->1;2->2"))
-class IndividualConstantMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, 
Y, Z)] {
+@ForwardedFields(Array("0;1;2"))
+class IndividualForwardMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, 
Y, Z)] {
   def map(value: (X, Y, Z)): (X, Y, Z) = {
     value
   }
 }
 
-@ConstantFields(Array("0"))
-class ZeroConstantMapper[T] extends RichMapFunction[T, T] {
-  def map(value: T): T = {
+@ForwardedFields(Array("_2"))
+class FieldTwoForwardMapper[X, Y, Z] extends RichMapFunction[(X, Y, Z), (X, Y, 
Z)] {
+  def map(value: (X, Y ,Z)): (X, Y, Z) = {
     value
   }
 }
 
-@ConstantFieldsFirst(Array("1 -> 0"))
-@ConstantFieldsSecond(Array("1 -> 1"))
-class ForwardingTupleJoin[A, B, C, D] extends RichJoinFunction[(A, B),  (C, 
D), (B, D)] {
-  def join(first: (A, B), second: (C, D)): (B, D) = {
-    (first._2, second._2)
+@ForwardedFieldsFirst(Array("_2 -> _1"))
+@ForwardedFieldsSecond(Array("_1 -> _2"))
+class ForwardingTupleJoin[A, B, C, D] extends RichJoinFunction[(A, B),  (C, 
D), (B, C)] {
+  def join(first: (A, B), second: (C, D)): (B, C) = {
+    (first._2, second._1)
   }
 }
 
-@ConstantFieldsFirst(Array("0 -> 0"))
-@ConstantFieldsSecond(Array("0 -> 1"))
+@ForwardedFieldsFirst(Array("* -> 0.*"))
+@ForwardedFieldsSecond(Array("* -> 1.*"))
 class ForwardingBasicJoin[A, B] extends RichJoinFunction[A, B, (A, B)] {
   def join(first: A, second: B): (A, B) = {
     (first, second)
   }
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/de8e066c/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
index 0c25995..640890f 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/types/TypeInformationGenTest.scala
@@ -39,6 +39,8 @@ class MyWritable extends Writable {
 
 case class CustomCaseClass(a: String, b: Int)
 
+case class UmlautCaseClass(ä: String, ß: Int)
+
 class CustomType(var myField1: String, var myField2: Int) {
   def this() {
     this(null, 0)
@@ -278,5 +280,142 @@ class TypeInformationGenTest {
     
Assert.assertEquals(PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO, 
tti.getTypeAt(7))
     Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, 
tti.getTypeAt(8))
   }
+
+  @Test
+  def testGetFlatFields(): Unit = {
+
+    val tupleTypeInfo = createTypeInformation[(Int, Int, Int, Int)].
+      asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int)]]
+    Assert.assertEquals(0, tupleTypeInfo.getFlatFields("0").get(0).getPosition)
+    Assert.assertEquals(1, tupleTypeInfo.getFlatFields("1").get(0).getPosition)
+    Assert.assertEquals(2, tupleTypeInfo.getFlatFields("2").get(0).getPosition)
+    Assert.assertEquals(3, tupleTypeInfo.getFlatFields("3").get(0).getPosition)
+    Assert.assertEquals(0, 
tupleTypeInfo.getFlatFields("_1").get(0).getPosition)
+    Assert.assertEquals(1, 
tupleTypeInfo.getFlatFields("_2").get(0).getPosition)
+    Assert.assertEquals(2, 
tupleTypeInfo.getFlatFields("_3").get(0).getPosition)
+    Assert.assertEquals(3, 
tupleTypeInfo.getFlatFields("_4").get(0).getPosition)
+
+    val nestedTypeInfo = createTypeInformation[(Int, (Int, String, Long), Int, 
(Double, Double))].
+      asInstanceOf[CaseClassTypeInfo[(Int, (Int, String, Long), Int, (Double, 
Double))]]
+    Assert.assertEquals(0, 
nestedTypeInfo.getFlatFields("0").get(0).getPosition)
+    Assert.assertEquals(1, 
nestedTypeInfo.getFlatFields("1.0").get(0).getPosition)
+    Assert.assertEquals(2, 
nestedTypeInfo.getFlatFields("1.1").get(0).getPosition)
+    Assert.assertEquals(3, 
nestedTypeInfo.getFlatFields("1.2").get(0).getPosition)
+    Assert.assertEquals(4, 
nestedTypeInfo.getFlatFields("2").get(0).getPosition)
+    Assert.assertEquals(5, 
nestedTypeInfo.getFlatFields("3.0").get(0).getPosition)
+    Assert.assertEquals(6, 
nestedTypeInfo.getFlatFields("3.1").get(0).getPosition)
+    Assert.assertEquals(4, 
nestedTypeInfo.getFlatFields("_3").get(0).getPosition)
+    Assert.assertEquals(5, 
nestedTypeInfo.getFlatFields("_4._1").get(0).getPosition)
+    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1").size)
+    Assert.assertEquals(1, 
nestedTypeInfo.getFlatFields("1").get(0).getPosition)
+    Assert.assertEquals(2, 
nestedTypeInfo.getFlatFields("1").get(1).getPosition)
+    Assert.assertEquals(3, 
nestedTypeInfo.getFlatFields("1").get(2).getPosition)
+    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("1.*").size)
+    Assert.assertEquals(1, 
nestedTypeInfo.getFlatFields("1.*").get(0).getPosition)
+    Assert.assertEquals(2, 
nestedTypeInfo.getFlatFields("1.*").get(1).getPosition)
+    Assert.assertEquals(3, 
nestedTypeInfo.getFlatFields("1.*").get(2).getPosition)
+    Assert.assertEquals(2, nestedTypeInfo.getFlatFields("3").size)
+    Assert.assertEquals(5, 
nestedTypeInfo.getFlatFields("3").get(0).getPosition)
+    Assert.assertEquals(6, 
nestedTypeInfo.getFlatFields("3").get(1).getPosition)
+    Assert.assertEquals(3, nestedTypeInfo.getFlatFields("_2").size)
+    Assert.assertEquals(1, 
nestedTypeInfo.getFlatFields("_2").get(0).getPosition)
+    Assert.assertEquals(2, 
nestedTypeInfo.getFlatFields("_2").get(1).getPosition)
+    Assert.assertEquals(3, 
nestedTypeInfo.getFlatFields("_2").get(2).getPosition)
+    Assert.assertEquals(2, nestedTypeInfo.getFlatFields("_4").size)
+    Assert.assertEquals(5, 
nestedTypeInfo.getFlatFields("_4").get(0).getPosition)
+    Assert.assertEquals(6, 
nestedTypeInfo.getFlatFields("_4").get(1).getPosition)
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO,
+      nestedTypeInfo.getFlatFields("0").get(0).getType)
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO,
+      nestedTypeInfo.getFlatFields("1.1").get(0).getType)
+    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO,
+      nestedTypeInfo.getFlatFields("1").get(2).getType)
+    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO,
+      nestedTypeInfo.getFlatFields("3").get(1).getType)
+
+    val deepNestedTupleTypeInfo = createTypeInformation[(Int, (Int, (Int, 
Int)), Int)].
+      asInstanceOf[CaseClassTypeInfo[(Int, (Int, (Int, Int)), Int)]]
+    Assert.assertEquals(3, deepNestedTupleTypeInfo.getFlatFields("1").size)
+    Assert.assertEquals(1, 
deepNestedTupleTypeInfo.getFlatFields("1").get(0).getPosition)
+    Assert.assertEquals(2, 
deepNestedTupleTypeInfo.getFlatFields("1").get(1).getPosition)
+    Assert.assertEquals(3, 
deepNestedTupleTypeInfo.getFlatFields("1").get(2).getPosition)
+    Assert.assertEquals(5, deepNestedTupleTypeInfo.getFlatFields("*").size)
+    Assert.assertEquals(0, 
deepNestedTupleTypeInfo.getFlatFields("*").get(0).getPosition)
+    Assert.assertEquals(1, 
deepNestedTupleTypeInfo.getFlatFields("*").get(1).getPosition)
+    Assert.assertEquals(2, 
deepNestedTupleTypeInfo.getFlatFields("*").get(2).getPosition)
+    Assert.assertEquals(3, 
deepNestedTupleTypeInfo.getFlatFields("*").get(3).getPosition)
+    Assert.assertEquals(4, 
deepNestedTupleTypeInfo.getFlatFields("*").get(4).getPosition)
+
+    val caseClassTypeInfo = createTypeInformation[CustomCaseClass].
+      asInstanceOf[CaseClassTypeInfo[CustomCaseClass]]
+    Assert.assertEquals(0, 
caseClassTypeInfo.getFlatFields("a").get(0).getPosition)
+    Assert.assertEquals(1, 
caseClassTypeInfo.getFlatFields("b").get(0).getPosition)
+    Assert.assertEquals(2, caseClassTypeInfo.getFlatFields("*").size)
+    Assert.assertEquals(0, 
caseClassTypeInfo.getFlatFields("*").get(0).getPosition)
+    Assert.assertEquals(1, 
caseClassTypeInfo.getFlatFields("*").get(1).getPosition)
+
+    val caseClassInTupleTypeInfo = createTypeInformation[(Int, 
UmlautCaseClass)].
+      asInstanceOf[CaseClassTypeInfo[(Int, UmlautCaseClass)]]
+    Assert.assertEquals(1, 
caseClassInTupleTypeInfo.getFlatFields("_2.ä").get(0).getPosition)
+    Assert.assertEquals(2, 
caseClassInTupleTypeInfo.getFlatFields("1.ß").get(0).getPosition)
+    Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("1").size)
+    Assert.assertEquals(1, 
caseClassInTupleTypeInfo.getFlatFields("1.*").get(0).getPosition)
+    Assert.assertEquals(2, 
caseClassInTupleTypeInfo.getFlatFields("1").get(1).getPosition)
+    Assert.assertEquals(2, caseClassInTupleTypeInfo.getFlatFields("_2.*").size)
+    Assert.assertEquals(1, 
caseClassInTupleTypeInfo.getFlatFields("_2.*").get(0).getPosition)
+    Assert.assertEquals(2, 
caseClassInTupleTypeInfo.getFlatFields("_2").get(1).getPosition)
+    Assert.assertEquals(3, caseClassInTupleTypeInfo.getFlatFields("*").size)
+    Assert.assertEquals(0, 
caseClassInTupleTypeInfo.getFlatFields("*").get(0).getPosition)
+    Assert.assertEquals(1, 
caseClassInTupleTypeInfo.getFlatFields("*").get(1).getPosition)
+    Assert.assertEquals(2, 
caseClassInTupleTypeInfo.getFlatFields("*").get(2).getPosition)
+
+  }
+
+  @Test
+  def testFieldAtStringRef(): Unit = {
+
+    val tupleTypeInfo = createTypeInformation[(Int, Int, Int, Int)].
+      asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int)]]
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("0"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("2"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("_2"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
tupleTypeInfo.getTypeAt("_4"))
+
+    val nestedTypeInfo = createTypeInformation[(Int, (Int, String, Long), Int, 
(Double, Double))].
+      asInstanceOf[CaseClassTypeInfo[(Int, (Int, String, Long), Int, (Double, 
Double))]]
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("0"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("1.0"))
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
nestedTypeInfo.getTypeAt("1.1"))
+    Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, 
nestedTypeInfo.getTypeAt("1.2"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("2"))
+    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
nestedTypeInfo.getTypeAt("3.0"))
+    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
nestedTypeInfo.getTypeAt("3.1"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
nestedTypeInfo.getTypeAt("_3"))
+    Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
nestedTypeInfo.getTypeAt("_4._1"))
+    Assert.assertEquals(createTypeInformation[(Int, String, Long)], 
nestedTypeInfo.getTypeAt("1"))
+    Assert.assertEquals(createTypeInformation[(Double, Double)], 
nestedTypeInfo.getTypeAt("3"))
+    Assert.assertEquals(createTypeInformation[(Int, String, Long)], 
nestedTypeInfo.getTypeAt("_2"))
+    Assert.assertEquals(createTypeInformation[(Double, Double)], 
nestedTypeInfo.getTypeAt("_4"))
+
+    val deepNestedTupleTypeInfo = createTypeInformation[(Int, (Int, (Int, 
Int)), Int)].
+      asInstanceOf[CaseClassTypeInfo[(Int, (Int, (Int, Int)), Int)]]
+    Assert.assertEquals(createTypeInformation[(Int, (Int, Int))],
+      deepNestedTupleTypeInfo.getTypeAt("1"))
+
+    val umlautCaseClassTypeInfo = createTypeInformation[UmlautCaseClass].
+      asInstanceOf[CaseClassTypeInfo[UmlautCaseClass]]
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
umlautCaseClassTypeInfo.getTypeAt("ä"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
umlautCaseClassTypeInfo.getTypeAt("ß"))
+
+    val caseClassTypeInfo = createTypeInformation[CustomCaseClass].
+      asInstanceOf[CaseClassTypeInfo[CustomCaseClass]]
+    val caseClassInTupleTypeInfo = createTypeInformation[(Int, 
CustomCaseClass)].
+      asInstanceOf[CaseClassTypeInfo[(Int, CustomCaseClass)]]
+    Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, 
caseClassInTupleTypeInfo.getTypeAt("_2.a"))
+    Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
caseClassInTupleTypeInfo.getTypeAt("1.b"))
+    Assert.assertEquals(caseClassTypeInfo, 
caseClassInTupleTypeInfo.getTypeAt("1"))
+    Assert.assertEquals(caseClassTypeInfo, 
caseClassInTupleTypeInfo.getTypeAt("_2"))
+
+  }
 }
 

Reply via email to