[FLINK-3702] FieldAccessor refactor to static factory

Closes #2094


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/870e219d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/870e219d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/870e219d

Branch: refs/heads/master
Commit: 870e219d92809df76c843906e19c7c0606529f11
Parents: 1f04542
Author: Marton Balassi <[email protected]>
Authored: Thu Nov 3 16:17:46 2016 +0100
Committer: Marton Balassi <[email protected]>
Committed: Thu Nov 24 22:22:55 2016 +0100

----------------------------------------------------------------------
 .../api/common/typeinfo/BasicArrayTypeInfo.java |  18 -
 .../api/common/typeinfo/BasicTypeInfo.java      |  26 --
 .../InvalidFieldReferenceException.java         |  31 --
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |  18 -
 .../api/common/typeinfo/TypeInformation.java    |  34 --
 .../api/common/typeutils/CompositeType.java     |  10 +
 .../flink/api/java/typeutils/FieldAccessor.java | 324 ----------------
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  37 +-
 .../api/java/typeutils/TupleTypeInfoBase.java   |  32 +-
 .../api/java/typeutils/FieldAccessorTest.java   | 343 -----------------
 .../api/java/functions/SemanticPropUtil.java    |   2 +-
 .../flink/api/java/operator/DataSinkTest.java   |   5 +-
 .../operator/FullOuterJoinOperatorTest.java     |   3 +-
 .../operator/LeftOuterJoinOperatorTest.java     |   3 +-
 .../operator/RightOuterJoinOperatorTest.java    |   3 +-
 .../scala/typeutils/ProductFieldAccessor.java   |  75 ----
 .../api/scala/typeutils/CaseClassTypeInfo.scala |  31 +-
 .../scala/typeutils/CaseClassTypeInfoTest.scala | 110 ------
 .../streaming/api/datastream/KeyedStream.java   |  65 ++--
 .../aggregation/ComparableAggregator.java       |   7 +-
 .../functions/aggregation/SumAggregator.java    |   7 +-
 .../streaming/util/typeutils/FieldAccessor.java | 382 +++++++++++++++++++
 .../util/typeutils/FieldAccessorFactory.java    | 242 ++++++++++++
 .../util/typeutils/FieldAccessorTest.java       | 358 +++++++++++++++++
 flink-streaming-scala/pom.xml                   |   8 +
 .../flink/streaming/api/scala/KeyedStream.scala |  49 ++-
 .../api/scala/CaseClassFieldAccessorTest.scala  | 137 +++++++
 .../streaming/runtime/DataStreamPojoITCase.java |   4 +-
 28 files changed, 1209 insertions(+), 1155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
index d04e7d9..25b2850 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-import org.apache.flink.api.java.typeutils.FieldAccessor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -122,23 +121,6 @@ public final class BasicArrayTypeInfo<T, C> extends 
TypeInformation<T> {
        }
 
        @Override
-       @PublicEvolving
-       public <F> FieldAccessor<T, F> getFieldAccessor(int pos, 
ExecutionConfig config) {
-               return new FieldAccessor.ArrayFieldAccessor<>(pos, this);
-       }
-
-       @Override
-       @PublicEvolving
-       public <F> FieldAccessor<T, F> getFieldAccessor(String pos, 
ExecutionConfig config) {
-               try {
-                       return new 
FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(pos), this);
-               } catch (NumberFormatException ex) {
-                       throw new InvalidFieldReferenceException
-                               ("A field expression on an array must be an 
integer index (that might be given as a string).");
-               }
-       }
-
-       @Override
        public boolean equals(Object obj) {
                if (obj instanceof BasicArrayTypeInfo) {
                        BasicArrayTypeInfo<?, ?> other = (BasicArrayTypeInfo<?, 
?>) obj;

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index 09efba6..e2fd74e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -58,7 +58,6 @@ import 
org.apache.flink.api.common.typeutils.base.ShortSerializer;
 import org.apache.flink.api.common.typeutils.base.StringComparator;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
-import org.apache.flink.api.java.typeutils.FieldAccessor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -172,31 +171,6 @@ public class BasicTypeInfo<T> extends TypeInformation<T> 
implements AtomicType<T
                }
        }
 
-       @Override
-       @PublicEvolving
-       @SuppressWarnings("unchecked")
-       public <F> FieldAccessor<T, F> getFieldAccessor(int pos, 
ExecutionConfig config) {
-               if(pos != 0) {
-                       throw new InvalidFieldReferenceException("The " + 
((Integer) pos).toString() + ". field selected on a " +
-                               "basic type (" + this.toString() + "). A field 
expression on a basic type can only select " +
-                               "the 0th field (which means selecting the 
entire basic type).");
-               }
-               return (FieldAccessor<T, F>) new 
FieldAccessor.SimpleFieldAccessor<T>(this);
-       }
-
-       @Override
-       @PublicEvolving
-       public <F> FieldAccessor<T, F> getFieldAccessor(String field, 
ExecutionConfig config) {
-               try {
-                       int pos = field.equals("*") ? 0 : 
Integer.parseInt(field);
-                       return getFieldAccessor(pos, config);
-               } catch (NumberFormatException ex) {
-                       throw new InvalidFieldReferenceException("You tried to 
select the field \"" + field +
-                               "\" on a " + this.toString() + ". A field 
expression on a basic type can only be \"*\" or \"0\"" +
-                               " (both of which mean selecting the entire 
basic type).");
-               }
-       }
-
        // 
--------------------------------------------------------------------------------------------
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java
deleted file mode 100644
index 3c67c46..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeinfo;
-
-import org.apache.flink.annotation.PublicEvolving;
-
-@PublicEvolving
-public class InvalidFieldReferenceException extends IllegalArgumentException {
-
-       private static final long serialVersionUID = 1L;
-
-       public InvalidFieldReferenceException(String s) {
-               super(s);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 2bd96d3..1c6ce00 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -40,7 +40,6 @@ import 
org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerial
 import 
org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator;
 import 
org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator;
 import 
org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
-import org.apache.flink.api.java.typeutils.FieldAccessor;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -139,23 +138,6 @@ public class PrimitiveArrayTypeInfo<T> extends 
TypeInformation<T> implements Ato
                return this.serializer;
        }
 
-       @Override
-       @PublicEvolving
-       public <F> FieldAccessor<T, F> getFieldAccessor(int pos, 
ExecutionConfig config) {
-               return new FieldAccessor.ArrayFieldAccessor<>(pos, this);
-       }
-
-       @Override
-       @PublicEvolving
-       public <F> FieldAccessor<T, F> getFieldAccessor(String pos, 
ExecutionConfig config) {
-               try {
-                       return new 
FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(pos), this);
-               } catch (NumberFormatException ex) {
-                       throw new InvalidFieldReferenceException
-                               ("A field expression on an array must be an 
integer index (that might be given as a string).");
-               }
-       }
-
        /**
         * Gets the class that represents the component type.
         * @return The class of the component type.

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 7be2b68..154ceb1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -24,7 +24,6 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.FieldAccessor;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import java.io.Serializable;
@@ -173,39 +172,6 @@ public abstract class TypeInformation<T> implements 
Serializable {
        @PublicEvolving
        public abstract TypeSerializer<T> createSerializer(ExecutionConfig 
config);
 
-
-       /**
-        * Creates a {@link FieldAccessor} for the given field position, which 
can be used to get and set
-        * the specified field on instances of this type.
-        *
-        * @param pos The field position (zero-based)
-        * @param config Configuration object
-        * @param <F> The type of the field to access
-        * @return The created FieldAccessor
-        */
-       @PublicEvolving
-       public <F> FieldAccessor<T, F> getFieldAccessor(int pos, 
ExecutionConfig config){
-               throw new InvalidFieldReferenceException("Cannot reference 
field by position on " + this.toString()
-                       + "Referencing a field by position is supported on 
tuples, case classes, and arrays. "
-                       + "Additionally, you can select the 0th field of a 
primitive/basic type (e.g. int).");
-       }
-
-       /**
-        * Creates a {@link FieldAccessor} for the field that is given by a 
field expression,
-        * which can be used to get and set the specified field on instances of 
this type.
-        *
-        * @param field The field expression
-        * @param config Configuration object
-        * @param <F> The type of the field to access
-        * @return The created FieldAccessor
-        */
-       @PublicEvolving
-       public <F> FieldAccessor<T, F> getFieldAccessor(String field, 
ExecutionConfig config) {
-               throw new InvalidFieldReferenceException("Cannot reference 
field by field expression on " + this.toString()
-                               + "Field expressions are only supported on POJO 
types, tuples, and case classes. "
-                               + "(See the Flink documentation on what is 
considered a POJO.)");
-       }
-
        @Override
        public abstract String toString();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index a4230f4..4bf17ea 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -265,6 +265,16 @@ public abstract class CompositeType<T> extends 
TypeInformation<T> {
        @PublicEvolving
        public abstract int getFieldIndex(String fieldName);
 
+       @PublicEvolving
+       public static class InvalidFieldReferenceException extends 
IllegalArgumentException {
+
+               private static final long serialVersionUID = 1L;
+
+               public InvalidFieldReferenceException(String s) {
+                       super(s);
+               }
+       }
+
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof CompositeType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java
deleted file mode 100644
index 97ef31a..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.operators.Keys;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-
-/**
- * These classes encapsulate the logic of accessing a field specified by the 
user as either an index
- * or a field expression string. TypeInformation can also be requested for the 
field.
- * The position index might specify a field of a Tuple, an array, or a simple 
type (only "0th field").
- *
- * Field expressions that specify nested fields (e.g. "f1.a.foo") result in 
nested field accessors.
- * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
- * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor)
- */
-@PublicEvolving
-public abstract class FieldAccessor<T, F> implements Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       protected TypeInformation fieldType;
-
-       /**
-        * Gets the TypeInformation for the type of the field.
-        * Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
-        */
-       @SuppressWarnings("unchecked")
-       public TypeInformation<F> getFieldType() {
-               return fieldType;
-       }
-
-
-       /**
-        * Gets the value of the field (specified in the constructor) of the 
given record.
-        * @param record The record on which the field will be accessed
-        * @return The value of the field.
-        */
-       public abstract F get(T record);
-
-       /**
-        * Sets the field (specified in the constructor) of the given record to 
the given value.
-        *
-        * Warning: This might modify the original object, or might return a 
new object instance.
-        * (This is necessary, because the record might be immutable.)
-        *
-        * @param record The record to modify
-        * @param fieldValue The new value of the field
-        * @return A record that has the given field value. (this might be a 
new instance or the original)
-        */
-       public abstract T set(T record, F fieldValue);
-
-
-       // 
--------------------------------------------------------------------------------------------------
-
-
-       /**
-        * This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
-        * field of a POJO that is itself some composite type but is not 
further decomposed)
-        */
-       public final static class SimpleFieldAccessor<T> extends 
FieldAccessor<T, T> {
-
-               private static final long serialVersionUID = 1L;
-
-               public SimpleFieldAccessor(TypeInformation<T> typeInfo) {
-                       checkNotNull(typeInfo, "typeInfo must not be null.");
-
-                       this.fieldType = typeInfo;
-               }
-
-               @Override
-               public T get(T record) {
-                       return record;
-               }
-
-               @Override
-               public T set(T record, T fieldValue) {
-                       return fieldValue;
-               }
-       }
-
-       public final static class ArrayFieldAccessor<T, F> extends 
FieldAccessor<T, F> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final int pos;
-
-               public ArrayFieldAccessor(int pos, TypeInformation typeInfo) {
-                       if(pos < 0) {
-                               throw new InvalidFieldReferenceException("The " 
+ ((Integer) pos).toString() + ". field selected on" +
-                                       " an array, which is an invalid 
index.");
-                       }
-                       checkNotNull(typeInfo, "typeInfo must not be null.");
-
-                       this.pos = pos;
-                       this.fieldType = 
BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType());
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public F get(T record) {
-                       return (F) Array.get(record, pos);
-               }
-
-               @Override
-               public T set(T record, F fieldValue) {
-                       Array.set(record, pos, fieldValue);
-                       return record;
-               }
-       }
-
-       /**
-        * There are two versions of TupleFieldAccessor, differing in whether 
there is an other
-        * FieldAccessor nested inside. The no inner accessor version is 
probably a little faster.
-        */
-       static final class SimpleTupleFieldAccessor<T, F> extends 
FieldAccessor<T, F> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final int pos;
-
-               SimpleTupleFieldAccessor(int pos, TypeInformation<T> typeInfo) {
-                       int arity = ((TupleTypeInfo)typeInfo).getArity();
-                       if(pos < 0 || pos >= arity) {
-                               throw new InvalidFieldReferenceException(
-                                       "Tried to select " + ((Integer) 
pos).toString() + ". field on \"" +
-                                       typeInfo.toString() + "\", which is an 
invalid index.");
-                       }
-                       checkNotNull(typeInfo, "typeInfo must not be null.");
-
-                       this.pos = pos;
-                       this.fieldType = 
((TupleTypeInfo)typeInfo).getTypeAt(pos);
-               }
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public F get(T record) {
-                       final Tuple tuple = (Tuple) record;
-                       return (F) tuple.getField(pos);
-               }
-
-               @Override
-               public T set(T record, F fieldValue) {
-                       final Tuple tuple = (Tuple) record;
-                       tuple.setField(fieldValue, pos);
-                       return record;
-               }
-       }
-
-       /**
-        * @param <T> The Tuple type
-        * @param <R> The field type at the first level
-        * @param <F> The field type at the innermost level
-        */
-       static final class RecursiveTupleFieldAccessor<T, R, F> extends 
FieldAccessor<T, F> {
-
-               private static final long serialVersionUID = 1L;
-
-               private final int pos;
-               private final FieldAccessor<R, F> innerAccessor;
-
-               RecursiveTupleFieldAccessor(int pos, FieldAccessor<R, F> 
innerAccessor) {
-                       if(pos < 0) {
-                               throw new InvalidFieldReferenceException("Tried 
to select " + ((Integer) pos).toString() + ". field.");
-                       }
-                       checkNotNull(innerAccessor, "innerAccessor must not be 
null.");
-
-                       this.pos = pos;
-                       this.innerAccessor = innerAccessor;
-                       this.fieldType = innerAccessor.fieldType;
-               }
-
-               @Override
-               public F get(T record) {
-                       final Tuple tuple = (Tuple) record;
-                       final R inner = tuple.getField(pos);
-                       return innerAccessor.get(inner);
-               }
-
-               @Override
-               public T set(T record, F fieldValue) {
-                       final Tuple tuple = (Tuple) record;
-                       final R inner = tuple.getField(pos);
-                       tuple.setField(innerAccessor.set(inner, fieldValue), 
pos);
-                       return record;
-               }
-       }
-
-       /**
-        * @param <T> The POJO type
-        * @param <R> The field type at the first level
-        * @param <F> The field type at the innermost level
-        */
-       static final class PojoFieldAccessor<T, R, F> extends FieldAccessor<T, 
F> {
-
-               private static final long serialVersionUID = 1L;
-
-               private transient Field field;
-               private final FieldAccessor<R, F> innerAccessor;
-
-               PojoFieldAccessor(Field field, FieldAccessor<R, F> 
innerAccessor) {
-                       checkNotNull(field, "field must not be null.");
-                       checkNotNull(innerAccessor, "innerAccessor must not be 
null.");
-
-                       this.field = field;
-                       this.innerAccessor = innerAccessor;
-                       this.fieldType = innerAccessor.fieldType;
-               }
-
-               @Override
-               public F get(T pojo) {
-                       try {
-                               @SuppressWarnings("unchecked")
-                               final R inner = (R)field.get(pojo);
-                               return innerAccessor.get(inner);
-                       } catch (IllegalAccessException iaex) {
-                               throw new RuntimeException("This should not 
happen since we call setAccesssible(true) in readObject."
-                                               + " fields: " + field + " obj: 
" + pojo);
-                       }
-               }
-
-               @Override
-               public T set(T pojo, F valueToSet) {
-                       try {
-                               @SuppressWarnings("unchecked")
-                               final R inner = (R)field.get(pojo);
-                               field.set(pojo, innerAccessor.set(inner, 
valueToSet));
-                               return pojo;
-                       } catch (IllegalAccessException iaex) {
-                               throw new RuntimeException("This should not 
happen since we call setAccesssible(true) in readObject."
-                                               + " fields: " + field + " obj: 
" + pojo);
-                       }
-               }
-
-               private void writeObject(ObjectOutputStream out)
-                               throws IOException, ClassNotFoundException {
-                       out.defaultWriteObject();
-                       FieldSerializer.serializeField(field, out);
-               }
-
-               private void readObject(ObjectInputStream in)
-                               throws IOException, ClassNotFoundException {
-                       in.defaultReadObject();
-                       field = FieldSerializer.deserializeField(in);
-               }
-       }
-
-
-       // 
--------------------------------------------------------------------------------------------------
-
-       private final static String REGEX_FIELD = "[\\p{L}\\p{Digit}_\\$]*"; // 
This can start with a digit (because of Tuples)
-       private final static String REGEX_NESTED_FIELDS = 
"("+REGEX_FIELD+")(\\.(.+))?";
-       private final static String REGEX_NESTED_FIELDS_WILDCARD = 
REGEX_NESTED_FIELDS
-               +"|\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR
-               +"|\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA;
-
-       private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = 
Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
-
-       public static FieldExpression decomposeFieldExpression(String 
fieldExpression) {
-               Matcher matcher = 
PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
-               if(!matcher.matches()) {
-                       throw new InvalidFieldReferenceException("Invalid field 
expression \""+fieldExpression+"\".");
-               }
-
-               String head = matcher.group(0);
-               if(head.equals(Keys.ExpressionKeys.SELECT_ALL_CHAR) || 
head.equals(Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
-                       throw new InvalidFieldReferenceException("No wildcards 
are allowed here.");
-               } else {
-                       head = matcher.group(1);
-               }
-
-               String tail = matcher.group(3);
-
-               return new FieldExpression(head, tail);
-       }
-
-       /**
-        * Represents a decomposition of a field expression into its first 
part, and the rest.
-        * E.g. "foo.f1.bar" is decomposed into "foo" and "f1.bar".
-        */
-       public static class FieldExpression implements Serializable {
-
-               private static final long serialVersionUID = 1L;
-
-               public String head, tail; // tail can be null, if the field 
expression had just one part
-
-               FieldExpression(String head, String tail) {
-                       this.head = head;
-                       this.tail = tail;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 72432d6..8a4fbbe 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -133,7 +132,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
                //   gives only some undefined order.
                return false;
        }
-
+       
 
        @Override
        @PublicEvolving
@@ -319,39 +318,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 
                return new PojoSerializer<T>(getTypeClass(), fieldSerializers, 
reflectiveFields, config);
        }
-
-       @Override
-       @PublicEvolving
-       public <F> FieldAccessor<T, F> getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
-               
-               FieldAccessor.FieldExpression decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
-
-               // get field
-               PojoField field = null;
-               TypeInformation<?> fieldType = null;
-               for (int i = 0; i < fields.length; i++) {
-                       if (fields[i].getField().getName().equals(decomp.head)) 
{
-                               field = fields[i];
-                               fieldType = fields[i].getTypeInformation();
-                               break;
-                       }
-               }
-               if (field == null) {
-                       throw new InvalidFieldReferenceException("Unable to 
find field \""+decomp.head+"\" in type "+this+".");
-               }
-
-               if(decomp.tail == null) {
-                       @SuppressWarnings("unchecked")
-                       FieldAccessor<F,F> innerAccessor = new 
FieldAccessor.SimpleFieldAccessor<F>((TypeInformation<F>) fieldType);
-                       return new FieldAccessor.PojoFieldAccessor<T, F, 
F>(field.getField(), innerAccessor);
-               } else {
-                       @SuppressWarnings("unchecked")
-                       FieldAccessor<Object,F> innerAccessor =
-                                       
(FieldAccessor<Object,F>)fieldType.<F>getFieldAccessor(decomp.tail, config);
-                       return new FieldAccessor.PojoFieldAccessor<T, Object, 
F>(field.getField(), innerAccessor);
-               }
-       }
-
+       
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof PojoTypeInfo) {

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index c9a55fc..807fd54 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,10 +23,7 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 
@@ -206,34 +203,7 @@ public abstract class TupleTypeInfoBase<T> extends 
CompositeType<T> {
                TypeInformation<X> typed = (TypeInformation<X>) this.types[pos];
                return typed;
        }
-
-       @Override
-       @PublicEvolving
-       public <F> FieldAccessor<T, F> getFieldAccessor(int pos, 
ExecutionConfig config) {
-               return new FieldAccessor.SimpleTupleFieldAccessor<T, F>(pos, 
this);
-       }
-
-       @Override
-       @PublicEvolving
-       public <F> FieldAccessor<T, F> getFieldAccessor(String fieldExpression, 
ExecutionConfig config) {
-               FieldAccessor.FieldExpression decomp = 
FieldAccessor.decomposeFieldExpression(fieldExpression);
-               int fieldPos = this.getFieldIndex(decomp.head);
-               if (fieldPos == -1) {
-                       try {
-                               fieldPos = Integer.parseInt(decomp.head);
-                       } catch (NumberFormatException ex) {
-                               throw new InvalidFieldReferenceException("Tried 
to select field \"" + decomp.head
-                                       + "\" on " + this.toString());
-                       }
-               }
-               if (decomp.tail == null) {
-                       return new FieldAccessor.SimpleTupleFieldAccessor<T, 
F>(fieldPos, this);
-               } else {
-                       FieldAccessor<?, F> innerAccessor = 
getTypeAt(fieldPos).getFieldAccessor(decomp.tail, config);
-                       return new 
FieldAccessor.RecursiveTupleFieldAccessor<>(fieldPos, innerAccessor);
-               }
-       }
-
+       
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof TupleTypeInfoBase) {

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
deleted file mode 100644
index f780447..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.junit.Test;
-
-public class FieldAccessorTest {
-
-       // Note, that AggregationFunctionTest indirectly also tests 
FieldAccessors.
-       // ProductFieldAccessor is tested in CaseClassTypeInfoTest.
-
-       @Test
-       public void testFlatTuple() {
-               Tuple2<String, Integer> t = Tuple2.of("aa", 5);
-               TupleTypeInfo<Tuple2<String, Integer>> tpeInfo =
-                               (TupleTypeInfo<Tuple2<String, Integer>>) 
TypeExtractor.getForObject(t);
-
-               FieldAccessor<Tuple2<String, Integer>, String> f0 = 
tpeInfo.getFieldAccessor("f0", null);
-               assertEquals("aa", f0.get(t));
-               assertEquals("aa", t.f0);
-               t = f0.set(t, "b");
-               assertEquals("b", f0.get(t));
-               assertEquals("b", t.f0);
-
-               FieldAccessor<Tuple2<String, Integer>, Integer> f1 = 
tpeInfo.getFieldAccessor("f1", null);
-               assertEquals(5, (int) f1.get(t));
-               assertEquals(5, (int) t.f1);
-               t = f1.set(t, 7);
-               assertEquals(7, (int) f1.get(t));
-               assertEquals(7, (int) t.f1);
-               assertEquals("b", f0.get(t));
-               assertEquals("b", t.f0);
-
-
-               FieldAccessor<Tuple2<String, Integer>, Integer> f1n = 
tpeInfo.getFieldAccessor(1, null);
-               assertEquals(7, (int) f1n.get(t));
-               assertEquals(7, (int) t.f1);
-               t = f1n.set(t, 10);
-               assertEquals(10, (int) f1n.get(t));
-               assertEquals(10, (int) f1.get(t));
-               assertEquals(10, (int) t.f1);
-               assertEquals("b", f0.get(t));
-               assertEquals("b", t.f0);
-
-               FieldAccessor<Tuple2<String, Integer>, Integer> f1ns = 
tpeInfo.getFieldAccessor("1", null);
-               assertEquals(10, (int) f1ns.get(t));
-               assertEquals(10, (int) t.f1);
-               t = f1ns.set(t, 11);
-               assertEquals(11, (int) f1ns.get(t));
-               assertEquals(11, (int) f1.get(t));
-               assertEquals(11, (int) t.f1);
-               assertEquals("b", f0.get(t));
-               assertEquals("b", t.f0);
-
-               // This is technically valid (the ".0" is selecting the 0th 
field of a basic type).
-               FieldAccessor<Tuple2<String, Integer>, String> f0_0 = 
tpeInfo.getFieldAccessor("f0.0", null);
-               assertEquals("b", f0_0.get(t));
-               assertEquals("b", t.f0);
-               t = f0_0.set(t, "cc");
-               assertEquals("cc", f0_0.get(t));
-               assertEquals("cc", t.f0);
-
-               try {
-                       FieldAccessor<Tuple2<String, Integer>, String> bad = 
tpeInfo.getFieldAccessor("almafa", null);
-                       assertFalse("Expected exception, because of bad field 
name", false);
-               } catch (InvalidFieldReferenceException ex) {
-                       // OK
-               }
-       }
-
-       @Test
-       public void testTupleInTuple() {
-               Tuple2<String, Tuple3<Integer, Long, Double>> t = 
Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
-               TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>> 
tpeInfo =
-                               (TupleTypeInfo<Tuple2<String, Tuple3<Integer, 
Long, Double>>>)TypeExtractor.getForObject(t);
-
-               FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, 
String> f0 = tpeInfo.getFieldAccessor("f0", null);
-               assertEquals("aa", f0.get(t));
-               assertEquals("aa", t.f0);
-
-               FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, 
Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
-               assertEquals(2.0, f1f2.get(t), 0);
-               assertEquals(2.0, t.f1.f2, 0);
-               t = f1f2.set(t, 3.0);
-               assertEquals(3.0, f1f2.get(t), 0);
-               assertEquals(3.0, t.f1.f2, 0);
-               assertEquals("aa", f0.get(t));
-               assertEquals("aa", t.f0);
-
-               FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, 
Tuple3<Integer, Long, Double>> f1 = tpeInfo.getFieldAccessor("f1", null);
-               assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
-               assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
-               t = f1.set(t, Tuple3.of(8, 12L, 4.0));
-               assertEquals(Tuple3.of(8, 12L, 4.0), f1.get(t));
-               assertEquals(Tuple3.of(8, 12L, 4.0), t.f1);
-               assertEquals("aa", f0.get(t));
-               assertEquals("aa", t.f0);
-
-               FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, 
Tuple3<Integer, Long, Double>> f1n = tpeInfo.getFieldAccessor(1, null);
-               assertEquals(Tuple3.of(8, 12L, 4.0), f1n.get(t));
-               assertEquals(Tuple3.of(8, 12L, 4.0), t.f1);
-               t = f1n.set(t, Tuple3.of(10, 13L, 5.0));
-               assertEquals(Tuple3.of(10, 13L, 5.0), f1n.get(t));
-               assertEquals(Tuple3.of(10, 13L, 5.0), f1.get(t));
-               assertEquals(Tuple3.of(10, 13L, 5.0), t.f1);
-               assertEquals("aa", f0.get(t));
-               assertEquals("aa", t.f0);
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testTupleFieldAccessorOutOfBounds() {
-               try {
-                       TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, 
Integer.class).getFieldAccessor(2, null);
-                       fail();
-               } catch (InvalidFieldReferenceException e) {
-                       // Nothing to do here
-               }
-       }
-
-       public static class Foo {
-               public int x;
-               public Tuple2<String, Long> t;
-               public Short y;
-
-               public Foo() {}
-
-               public Foo(int x, Tuple2<String, Long> t, Short y) {
-                       this.x = x;
-                       this.t = t;
-                       this.y = y;
-               }
-       }
-
-       @Test
-       public void testTupleInPojoInTuple() {
-               Tuple2<String, Foo> t = Tuple2.of("aa", new Foo(8, 
Tuple2.of("ddd", 9L), (short) 2));
-               TupleTypeInfo<Tuple2<String, Foo>> tpeInfo =
-                               (TupleTypeInfo<Tuple2<String, 
Foo>>)TypeExtractor.getForObject(t);
-
-               FieldAccessor<Tuple2<String, Foo>, Long> f1tf1 = 
tpeInfo.getFieldAccessor("f1.t.f1", null);
-               assertEquals(9L, (long) f1tf1.get(t));
-               assertEquals(9L, (long) t.f1.t.f1);
-               t = f1tf1.set(t, 12L);
-               assertEquals(12L, (long) f1tf1.get(t));
-               assertEquals(12L, (long) t.f1.t.f1);
-
-               FieldAccessor<Tuple2<String, Foo>, String> f1tf0 = 
tpeInfo.getFieldAccessor("f1.t.f0", null);
-               assertEquals("ddd", f1tf0.get(t));
-               assertEquals("ddd", t.f1.t.f0);
-               t = f1tf0.set(t, "alma");
-               assertEquals("alma", f1tf0.get(t));
-               assertEquals("alma", t.f1.t.f0);
-
-               FieldAccessor<Tuple2<String, Foo>, Foo> f1 = 
tpeInfo.getFieldAccessor("f1", null);
-               FieldAccessor<Tuple2<String, Foo>, Foo> f1n = 
tpeInfo.getFieldAccessor(1, null);
-               assertEquals(Tuple2.of("alma", 12L), f1.get(t).t);
-               assertEquals(Tuple2.of("alma", 12L), f1n.get(t).t);
-               assertEquals(Tuple2.of("alma", 12L), t.f1.t);
-               Foo newFoo = new Foo(8, Tuple2.of("ddd", 9L), (short) 2);
-               f1.set(t, newFoo);
-               assertEquals(newFoo, f1.get(t));
-               assertEquals(newFoo, f1n.get(t));
-               assertEquals(newFoo, t.f1);
-       }
-
-
-       public static class Inner {
-               public long x;
-               public boolean b;
-
-               public Inner(){}
-
-               public Inner(long x) {
-                       this.x = x;
-               }
-
-               public Inner(long x, boolean b) {
-                       this.x = x;
-                       this.b = b;
-               }
-
-               @Override
-               public String toString() {
-                       return ((Long)x).toString() + ", " + b;
-               }
-       }
-
-       public static class Outer {
-               public int a;
-               public Inner i;
-               public short b;
-
-               public Outer(){}
-
-               public Outer(int a, Inner i, short b) {
-                       this.a = a;
-                       this.i = i;
-                       this.b = b;
-               }
-
-               @Override
-               public String toString() {
-                       return a+", "+i.toString()+", "+b;
-               }
-       }
-
-       @Test
-       public void testPojoInPojo() {
-               Outer o = new Outer(10, new Inner(4L), (short)12);
-               PojoTypeInfo<Outer> tpeInfo = 
(PojoTypeInfo<Outer>)TypeInformation.of(Outer.class);
-
-               FieldAccessor<Outer, Long> fix = 
tpeInfo.getFieldAccessor("i.x", null);
-               assertEquals(4L, (long) fix.get(o));
-               assertEquals(4L, o.i.x);
-               o = fix.set(o, 22L);
-               assertEquals(22L, (long) fix.get(o));
-               assertEquals(22L, o.i.x);
-
-               FieldAccessor<Outer, Inner> fi = tpeInfo.getFieldAccessor("i", 
null);
-               assertEquals(22L, fi.get(o).x);
-               assertEquals(22L, (long) fix.get(o));
-               assertEquals(22L, o.i.x);
-               o = fi.set(o, new Inner(30L));
-               assertEquals(30L, fi.get(o).x);
-               assertEquals(30L, (long) fix.get(o));
-               assertEquals(30L, o.i.x);
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testArray() {
-               int[] a = new int[]{3,5};
-               FieldAccessor<int[], Integer> fieldAccessor =
-                               (FieldAccessor<int[], Integer>) (Object)
-                                               
PrimitiveArrayTypeInfo.getInfoFor(a.getClass()).getFieldAccessor(1, null);
-
-               assertEquals(Integer.class, 
fieldAccessor.getFieldType().getTypeClass());
-
-               assertEquals((Integer)a[1], fieldAccessor.get(a));
-
-               a = fieldAccessor.set(a, 6);
-               assertEquals((Integer)a[1], fieldAccessor.get(a));
-
-
-
-               Integer[] b = new Integer[]{3,5};
-               FieldAccessor<Integer[], Integer> fieldAccessor2 =
-                               (FieldAccessor<Integer[], Integer>) (Object)
-                                               
BasicArrayTypeInfo.getInfoFor(b.getClass()).getFieldAccessor(1, null);
-
-               assertEquals(Integer.class, 
fieldAccessor2.getFieldType().getTypeClass());
-
-               assertEquals(b[1], fieldAccessor2.get(b));
-
-               b = fieldAccessor2.set(b, 6);
-               assertEquals(b[1], fieldAccessor2.get(b));
-       }
-
-       public static class ArrayInPojo {
-               public long x;
-               public int[] arr;
-               public int y;
-
-               public ArrayInPojo() {}
-
-               public ArrayInPojo(long x, int[] arr, int y) {
-                       this.x = x;
-                       this.arr = arr;
-                       this.y = y;
-               }
-       }
-
-       @Test
-       public void testArrayInPojo() {
-               ArrayInPojo o = new ArrayInPojo(10L, new int[]{3,4,5}, 12);
-               PojoTypeInfo<ArrayInPojo> tpeInfo = 
(PojoTypeInfo<ArrayInPojo>)TypeInformation.of(ArrayInPojo.class);
-
-               FieldAccessor<ArrayInPojo, Integer> fix = 
tpeInfo.getFieldAccessor("arr.1", null);
-               assertEquals(4, (int) fix.get(o));
-               assertEquals(4L, o.arr[1]);
-               o = fix.set(o, 8);
-               assertEquals(8, (int) fix.get(o));
-               assertEquals(8, o.arr[1]);
-       }
-
-       @Test
-       public void testBasicType() {
-               Long x = 7L;
-               TypeInformation<Long> tpeInfo = BasicTypeInfo.LONG_TYPE_INFO;
-
-               try {
-                       FieldAccessor<Long, Long> f = 
tpeInfo.getFieldAccessor(1, null);
-                       assertFalse("Expected exception, because not the 0th 
field selected for a basic type.", false);
-               } catch (InvalidFieldReferenceException ex) {
-                       // OK
-               }
-
-               try {
-                       FieldAccessor<Long, Long> f = 
tpeInfo.getFieldAccessor("foo", null);
-                       assertFalse("Expected exception, because not the 0th 
field selected for a basic type.", false);
-               } catch (InvalidFieldReferenceException ex) {
-                       // OK
-               }
-
-               FieldAccessor<Long, Long> f = tpeInfo.getFieldAccessor(0, null);
-               assertEquals(7L, (long) f.get(x));
-               x = f.set(x, 12L);
-               assertEquals(12L, (long) f.get(x));
-               assertEquals(12L, (long) x);
-
-               FieldAccessor<Long, Long> f2 = tpeInfo.getFieldAccessor("*", 
null);
-               assertEquals(12L, (long) f2.get(x));
-               x = f2.set(x, 14L);
-               assertEquals(14L, (long) f2.get(x));
-               assertEquals(14L, (long) x);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index 4a0b2fc..aedba15 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -27,8 +27,8 @@ import 
org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import 
org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
index 0da417b..0493583 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.java.operator;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -167,7 +166,7 @@ public class DataSinkTest {
                        .sortLocalOutput(5, Order.DESCENDING);
        }
 
-       @Test(expected = InvalidFieldReferenceException.class)
+       @Test(expected = CompositeType.InvalidFieldReferenceException.class)
        public void testFailTupleInv() {
 
                final ExecutionEnvironment env = ExecutionEnvironment
@@ -285,7 +284,7 @@ public class DataSinkTest {
                        .sortLocalOutput(1, Order.DESCENDING);
        }
 
-       @Test(expected = InvalidFieldReferenceException.class)
+       @Test(expected = CompositeType.InvalidFieldReferenceException.class)
        public void testFailPojoInvalidField() {
 
                final ExecutionEnvironment env = ExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
index 9f5cfb2..9f2aa41 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -136,7 +135,7 @@ public class FullOuterJoinOperatorTest {
                                .with(new DummyJoin());
        }
 
-       @Test(expected = InvalidFieldReferenceException.class)
+       @Test(expected = CompositeType.InvalidFieldReferenceException.class)
        public void testFullOuter8() {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = 
env.fromCollection(emptyTupleData, tupleTypeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
index 914c75c..bfcc3e8 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -137,7 +136,7 @@ public class LeftOuterJoinOperatorTest {
                                .with(new DummyJoin());
        }
 
-       @Test(expected = InvalidFieldReferenceException.class)
+       @Test(expected = CompositeType.InvalidFieldReferenceException.class)
        public void testLeftOuter8() {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = 
env.fromCollection(emptyTupleData, tupleTypeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
index f5d8129..709d830 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -136,7 +135,7 @@ public class RightOuterJoinOperatorTest {
                                .with(new DummyJoin());
        }
 
-       @Test(expected = InvalidFieldReferenceException.class)
+       @Test(expected = CompositeType.InvalidFieldReferenceException.class)
        public void testRightOuter8() {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = 
env.fromCollection(emptyTupleData, tupleTypeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java
 
b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java
deleted file mode 100644
index 0be6f33..0000000
--- 
a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.FieldAccessor;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
-import scala.Product;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-public final class ProductFieldAccessor<T, R, F> extends FieldAccessor<T, F> {
-
-       private static final long serialVersionUID = 1L;
-
-       private final int pos;
-       private final TupleSerializerBase<T> serializer;
-       private final Object[] fields;
-       private final int length;
-       private final FieldAccessor<R, F> innerAccessor;
-
-       ProductFieldAccessor(int pos, TypeInformation<T> typeInfo, 
FieldAccessor<R, F> innerAccessor, ExecutionConfig config) {
-               int arity = ((TupleTypeInfoBase)typeInfo).getArity();
-               if(pos < 0 || pos >= arity) {
-                       throw new InvalidFieldReferenceException(
-                               "Tried to select " + ((Integer) pos).toString() 
+ ". field on \"" +
-                                       typeInfo.toString() + "\", which is an 
invalid index.");
-               }
-               checkNotNull(typeInfo, "typeInfo must not be null.");
-               checkNotNull(innerAccessor, "innerAccessor must not be null.");
-
-               this.pos = pos;
-               this.fieldType = 
((TupleTypeInfoBase<T>)typeInfo).getTypeAt(pos);
-               this.serializer = 
(TupleSerializerBase<T>)typeInfo.createSerializer(config);
-               this.length = this.serializer.getArity();
-               this.fields = new Object[this.length];
-               this.innerAccessor = innerAccessor;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public F get(T record) {
-               return 
innerAccessor.get((R)((Product)record).productElement(pos));
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public T set(T record, F fieldValue) {
-               Product prod = (Product)record;
-               for (int i = 0; i < length; i++) {
-                       fields[i] = prod.productElement(i);
-               }
-               fields[pos] = innerAccessor.set((R)fields[pos], fieldValue);
-               return serializer.createInstance(fields);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index d970dfd..70ca412 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -25,12 +25,10 @@ import org.apache.flink.annotation.{Public, PublicEvolving}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.operators.Keys
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import 
org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, 
TypeComparatorBuilder}
+import 
org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, 
InvalidFieldReferenceException, TypeComparatorBuilder}
 import org.apache.flink.api.common.typeutils._
 import Keys.ExpressionKeys
-import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException
-import org.apache.flink.api.java.typeutils.FieldAccessor.SimpleFieldAccessor
-import org.apache.flink.api.java.typeutils.{FieldAccessor, TupleTypeInfoBase}
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -239,31 +237,6 @@ abstract class CaseClassTypeInfo[T <: Product](
     }
   }
 
-  override def getFieldAccessor[F](pos: Int, config: ExecutionConfig): 
FieldAccessor[T, F] = {
-    new ProductFieldAccessor[T,F,F](
-      pos, this, new 
SimpleFieldAccessor[F](types(pos).asInstanceOf[TypeInformation[F]]), config)
-  }
-
-  override def getFieldAccessor[F](fieldExpression: String, config: 
ExecutionConfig):
-    FieldAccessor[T, F] = {
-
-    val decomp = FieldAccessor.decomposeFieldExpression(fieldExpression)
-
-    val pos = getFieldIndex(decomp.head)
-    if(pos < 0) {
-      throw new InvalidFieldReferenceException("Invalid field selected: " + 
fieldExpression)
-    }
-    val fieldType = types(pos)
-
-    if (decomp.tail == null) {
-      getFieldAccessor(pos, config)
-    } else {
-      val innerAccessor =
-        fieldType.getFieldAccessor[F](decomp.tail, 
config).asInstanceOf[FieldAccessor[AnyRef, F]]
-      new ProductFieldAccessor[T,Object,F](pos, this, innerAccessor, config)
-    }
-  }
-
   override def toString: String = {
     clazz.getName + "(" + fieldNames.zip(types).map {
       case (n, t) => n + ": " + t

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
index a9abea1..479483f 100644
--- 
a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
+++ 
b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
@@ -21,11 +21,9 @@ package org.apache.flink.api.scala.typeutils
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.typeutils.FieldAccessorTest
 import org.apache.flink.util.TestLogger
 import org.junit.Test
 import org.scalatest.junit.JUnitSuiteLike
-import org.apache.flink.api.scala._
 
 class CaseClassTypeInfoTest extends TestLogger with JUnitSuiteLike {
 
@@ -72,112 +70,4 @@ class CaseClassTypeInfoTest extends TestLogger with 
JUnitSuiteLike {
     assert(!tpeInfo1.equals(tpeInfo2))
   }
 
-  @Test
-  def testFieldAccessorFlatCaseClass(): Unit = {
-    case class IntBoolean(foo: Int, bar: Boolean)
-    val tpeInfo = createTypeInformation[IntBoolean]
-
-    {
-      // by field name
-      val accessor1 = tpeInfo.getFieldAccessor[Int]("foo", null)
-      val accessor2 = tpeInfo.getFieldAccessor[Boolean]("bar", null)
-
-      val x1 = IntBoolean(5, false)
-      assert(accessor1.get(x1) == 5)
-      assert(accessor2.get(x1) == false)
-      assert(x1.foo == 5)
-      assert(x1.bar == false)
-
-      val x2: IntBoolean = accessor1.set(x1, 6)
-      assert(accessor1.get(x2) == 6)
-      assert(x2.foo == 6)
-
-      val x3 = accessor2.set(x2, true)
-      assert(x3.bar == true)
-      assert(accessor2.get(x3) == true)
-      assert(x3.foo == 6)
-    }
-
-    {
-      // by field pos
-      val accessor1 = tpeInfo.getFieldAccessor[Int](0, null)
-      val accessor2 = tpeInfo.getFieldAccessor[Boolean](1, null)
-
-      val x1 = IntBoolean(5, false)
-      assert(accessor1.get(x1) == 5)
-      assert(accessor2.get(x1) == false)
-      assert(x1.foo == 5)
-      assert(x1.bar == false)
-
-      val x2: IntBoolean = accessor1.set(x1, 6)
-      assert(accessor1.get(x2) == 6)
-      assert(x2.foo == 6)
-
-      val x3 = accessor2.set(x2, true)
-      assert(x3.bar == true)
-      assert(accessor2.get(x3) == true)
-      assert(x3.foo == 6)
-    }
-  }
-
-  @Test
-  def testFieldAccessorTuple(): Unit = {
-    val tpeInfo = createTypeInformation[(Int, Long)]
-    var x = (5, 6L)
-    val f0 = tpeInfo.getFieldAccessor[Int](0, null)
-    assert(f0.get(x) == 5)
-    x = f0.set(x, 8)
-    assert(f0.get(x) == 8)
-    assert(x._1 == 8)
-  }
-
-  @Test
-  def testFieldAccessorCaseClassInCaseClass(): Unit = {
-    case class Inner(a: Short, b: String)
-    case class Outer(a: Int, i: Inner, b: Boolean)
-    val tpeInfo = createTypeInformation[Outer]
-
-    var x = Outer(1, Inner(2, "alma"), true)
-
-    val fib = tpeInfo.getFieldAccessor[String]("i.b", null)
-    assert(fib.get(x) == "alma")
-    assert(x.i.b == "alma")
-    x = fib.set(x, "korte")
-    assert(fib.get(x) == "korte")
-    assert(x.i.b == "korte")
-
-    val fi = tpeInfo.getFieldAccessor[Inner]("i", null)
-    assert(fi.get(x) == Inner(2, "korte"))
-    x = fi.set(x, Inner(3, "aaa"))
-    assert(x.i == Inner(3, "aaa"))
-  }
-
-  @Test
-  def testFieldAccessorPojoInCaseClass(): Unit = {
-    case class Outer(a: Int, i: FieldAccessorTest.Inner, b: Boolean)
-    var x = Outer(1, new FieldAccessorTest.Inner(3L, true), false)
-    val tpeInfo = createTypeInformation[Outer]
-    val cfg = new ExecutionConfig
-
-    val fib = tpeInfo.getFieldAccessor[Boolean]("i.b", cfg)
-    assert(fib.get(x) == true)
-    assert(x.i.b == true)
-    x = fib.set(x, false)
-    assert(fib.get(x) == false)
-    assert(x.i.b == false)
-
-    val fi = tpeInfo.getFieldAccessor[FieldAccessorTest.Inner]("i", cfg)
-    assert(fi.get(x).x == 3L)
-    assert(x.i.x == 3L)
-    x = fi.set(x, new FieldAccessorTest.Inner(4L, true))
-    assert(fi.get(x).x == 4L)
-    assert(x.i.x == 4L)
-
-    val fin = tpeInfo.getFieldAccessor[FieldAccessorTest.Inner](1, cfg)
-    assert(fin.get(x).x == 4L)
-    assert(x.i.x == 4L)
-    x = fin.set(x, new FieldAccessorTest.Inner(5L, true))
-    assert(fin.get(x).x == 5L)
-    assert(x.i.x == 5L)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 264d5d0..5b00bcd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -384,9 +384,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *            name of the (public) field on which to perform the 
aggregation.
         *            Additionally, a dot can be used to drill down into nested
         *            objects, as in {@code "field1.fieldxy" }.
-        *            Furthermore, an array index can also be specified in case 
of an array of
-        *            a primitive or basic type; or "0" or "*" can be specified 
in case of a
-        *            basic type (which is considered as having only one field).
+        *            Furthermore "*" can be specified in case of a basic type
+        *            (which is considered as having only one field).
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<T> sum(String field) {
@@ -400,8 +399,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *
         * @param positionToMin
         *            The field position in the data points to minimize. This 
is applicable to
-        *            Tuple types, basic and primitive array types, Scala case 
classes,
-        *            and primitive types (which is considered as having one 
field).
+        *            Tuple types, Scala case classes, and primitive types 
(which is considered
+        *            as having one field).
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<T> min(int positionToMin) {
@@ -422,9 +421,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *            name of the (public) field on which to perform the 
aggregation.
         *            Additionally, a dot can be used to drill down into nested
         *            objects, as in {@code "field1.fieldxy" }.
-        *            Furthermore, an array index can also be specified in case 
of an array of
-        *            a primitive or basic type; or "0" or "*" can be specified 
in case of a
-        *            basic type (which is considered as having only one field).
+        *            Furthermore "*" can be specified in case of a basic type
+        *            (which is considered as having only one field).
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<T> min(String field) {
@@ -438,9 +436,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         * per key.
         *
         * @param positionToMax
-        *            The field position in the data points to maximize. This 
is applicable to
-        *            Tuple types, basic and primitive array types, Scala case 
classes,
-        *            and primitive types (which is considered as having one 
field).
+        *            The field position in the data points to minimize. This 
is applicable to
+        *            Tuple types, Scala case classes, and primitive types 
(which is considered
+        *            as having one field).
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<T> max(int positionToMax) {
@@ -461,9 +459,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *            name of the (public) field on which to perform the 
aggregation.
         *            Additionally, a dot can be used to drill down into nested
         *            objects, as in {@code "field1.fieldxy" }.
-        *            Furthermore, an array index can also be specified in case 
of an array of
-        *            a primitive or basic type; or "0" or "*" can be specified 
in case of a
-        *            basic type (which is considered as having only one field).
+        *            Furthermore "*" can be specified in case of a basic type
+        *            (which is considered as having only one field).
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<T> max(String field) {
@@ -484,9 +481,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *            name of the (public) field on which to perform the 
aggregation.
         *            Additionally, a dot can be used to drill down into nested
         *            objects, as in {@code "field1.fieldxy" }.
-        *            Furthermore, an array index can also be specified in case 
of an array of
-        *            a primitive or basic type; or "0" or "*" can be specified 
in case of a
-        *            basic type (which is considered as having only one field).
+        *            Furthermore "*" can be specified in case of a basic type
+        *            (which is considered as having only one field).
         * @param first
         *            If True then in case of field equality the first object 
will
         *            be returned
@@ -511,9 +507,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *            name of the (public) field on which to perform the 
aggregation.
         *            Additionally, a dot can be used to drill down into nested
         *            objects, as in {@code "field1.fieldxy" }.
-        *            Furthermore, an array index can also be specified in case 
of an array of
-        *            a primitive or basic type; or "0" or "*" can be specified 
in case of a
-        *            basic type (which is considered as having only one field).
+        *            Furthermore "*" can be specified in case of a basic type
+        *            (which is considered as having only one field).
         * @param first
         *            If True then in case of field equality the first object 
will
         *            be returned
@@ -532,8 +527,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *
         * @param positionToMinBy
         *            The field position in the data points to minimize. This 
is applicable to
-        *            Tuple types, basic and primitive array types, Scala case 
classes,
-        *            and primitive types (which is considered as having one 
field).
+        *            Tuple types, Scala case classes, and primitive types 
(which is considered
+        *            as having one field).
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
@@ -551,9 +546,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *            name of the (public) field on which to perform the 
aggregation.
         *            Additionally, a dot can be used to drill down into nested
         *            objects, as in {@code "field1.fieldxy" }.
-        *            Furthermore, an array index can also be specified in case 
of an array of
-        *            a primitive or basic type; or "0" or "*" can be specified 
in case of a
-        *            basic type (which is considered as having only one field).
+        *            Furthermore "*" can be specified in case of a basic type
+        *            (which is considered as having only one field).
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
@@ -569,8 +563,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *
         * @param positionToMinBy
         *            The field position in the data points to minimize. This 
is applicable to
-        *            Tuple types, basic and primitive array types, Scala case 
classes,
-        *            and primitive types (which is considered as having one 
field).
+        *            Tuple types, Scala case classes, and primitive types 
(which is considered
+        *            as having one field).
         * @param first
         *            If true, then the operator return the first element with 
the
         *            minimal value, otherwise returns the last
@@ -588,9 +582,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         * given position, the operator returns the first one by default.
         *
         * @param positionToMaxBy
-        *            The field position in the data points to maximize. This 
is applicable to
-        *            Tuple types, basic and primitive array types, Scala case 
classes,
-        *            and primitive types (which is considered as having one 
field).
+        *            The field position in the data points to minimize. This 
is applicable to
+        *            Tuple types, Scala case classes, and primitive types 
(which is considered
+        *            as having one field).
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
@@ -608,9 +602,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         *            name of the (public) field on which to perform the 
aggregation.
         *            Additionally, a dot can be used to drill down into nested
         *            objects, as in {@code "field1.fieldxy" }.
-        *            Furthermore, an array index can also be specified in case 
of an array of
-        *            a primitive or basic type; or "0" or "*" can be specified 
in case of a
-        *            basic type (which is considered as having only one field).
+        *            Furthermore "*" can be specified in case of a basic type
+        *            (which is considered as having only one field).
         * @return The transformed DataStream.
         */
        public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
@@ -625,9 +618,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         * depending on the parameter set.
         *
         * @param positionToMaxBy
-        *            The field position in the data points to maximize. This 
is applicable to
-        *            Tuple types, basic and primitive array types, Scala case 
classes,
-        *            and primitive types (which is considered as having one 
field).
+        *            The field position in the data points to minimize. This 
is applicable to
+        *            Tuple types, Scala case classes, and primitive types 
(which is considered
+        *            as having one field).
         * @param first
         *            If true, then the operator return the first element with 
the
         *            maximum value, otherwise returns the last

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
index 465548e..c634434 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java
@@ -20,7 +20,8 @@ package org.apache.flink.streaming.api.functions.aggregation;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.FieldAccessor;
+import org.apache.flink.streaming.util.typeutils.FieldAccessor;
+import org.apache.flink.streaming.util.typeutils.FieldAccessorFactory;
 
 @Internal
 public class ComparableAggregator<T> extends AggregationFunction<T> {
@@ -51,7 +52,7 @@ public class ComparableAggregator<T> extends 
AggregationFunction<T> {
                        AggregationType aggregationType,
                        boolean first,
                        ExecutionConfig config) {
-               this(aggregationType, 
typeInfo.getFieldAccessor(positionToAggregate, config), first);
+               this(aggregationType, 
FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first);
        }
 
        public ComparableAggregator(String field,
@@ -59,7 +60,7 @@ public class ComparableAggregator<T> extends 
AggregationFunction<T> {
                        AggregationType aggregationType,
                        boolean first,
                        ExecutionConfig config) {
-               this(aggregationType, typeInfo.getFieldAccessor(field,config), 
first);
+               this(aggregationType, 
FieldAccessorFactory.getAccessor(typeInfo, field, config), first);
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
index 90d5e74..5e1378e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java
@@ -22,8 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.util.typeutils.FieldAccessorFactory;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.FieldAccessor;
+import org.apache.flink.streaming.util.typeutils.FieldAccessor;
 
 @Internal
 public class SumAggregator<T> extends AggregationFunction<T> {
@@ -36,7 +37,7 @@ public class SumAggregator<T> extends AggregationFunction<T> {
        private final boolean isTuple;
 
        public SumAggregator(int pos, TypeInformation<T> typeInfo, 
ExecutionConfig config) {
-               fieldAccessor = typeInfo.getFieldAccessor(pos, config);
+               fieldAccessor = FieldAccessorFactory.getAccessor(typeInfo, pos, 
config);
                adder = 
SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass());
                if (typeInfo instanceof TupleTypeInfo) {
                        isTuple = true;
@@ -48,7 +49,7 @@ public class SumAggregator<T> extends AggregationFunction<T> {
        }
 
        public SumAggregator(String field, TypeInformation<T> typeInfo, 
ExecutionConfig config) {
-               fieldAccessor = typeInfo.getFieldAccessor(field, config);
+               fieldAccessor = FieldAccessorFactory.getAccessor(typeInfo, 
field, config);
                adder = 
SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass());
                if (typeInfo instanceof TupleTypeInfo) {
                        isTuple = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
new file mode 100644
index 0000000..2828308
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by the 
user as either an index
+ * or a field expression string. TypeInformation can also be requested for the 
field.
+ * The position index might specify a field of a Tuple, an array, or a simple 
type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result in 
nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an 
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, 
RecursiveProductFieldAccessor)
+ */
+@Internal
+public abstract class FieldAccessor<T, F> implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       protected TypeInformation fieldType;
+
+       /**
+        * Gets the TypeInformation for the type of the field.
+        * Note: For an array of a primitive type, it returns the corresponding 
basic type (Integer for int[]).
+        */
+       @SuppressWarnings("unchecked")
+       public TypeInformation<F> getFieldType() {
+               return fieldType;
+       }
+
+
+       /**
+        * Gets the value of the field (specified in the constructor) of the 
given record.
+        * @param record The record on which the field will be accessed
+        * @return The value of the field.
+        */
+       public abstract F get(T record);
+
+       /**
+        * Sets the field (specified in the constructor) of the given record to 
the given value.
+        *
+        * Warning: This might modify the original object, or might return a 
new object instance.
+        * (This is necessary, because the record might be immutable.)
+        *
+        * @param record The record to modify
+        * @param fieldValue The new value of the field
+        * @return A record that has the given field value. (this might be a 
new instance or the original)
+        */
+       public abstract T set(T record, F fieldValue);
+
+
+       // 
--------------------------------------------------------------------------------------------------
+
+
+       /**
+        * This is when the entire record is considered as a single field. (eg. 
field 0 of a basic type, or a
+        * field of a POJO that is itself some composite type but is not 
further decomposed)
+        */
+       final static class SimpleFieldAccessor<T> extends FieldAccessor<T, T> {
+
+               private static final long serialVersionUID = 1L;
+
+               public SimpleFieldAccessor(TypeInformation<T> typeInfo) {
+                       checkNotNull(typeInfo, "typeInfo must not be null.");
+
+                       this.fieldType = typeInfo;
+               }
+
+               @Override
+               public T get(T record) {
+                       return record;
+               }
+
+               @Override
+               public T set(T record, T fieldValue) {
+                       return fieldValue;
+               }
+       }
+
+       final static class ArrayFieldAccessor<T, F> extends FieldAccessor<T, F> 
{
+
+               private static final long serialVersionUID = 1L;
+
+               private final int pos;
+
+               public ArrayFieldAccessor(int pos, TypeInformation typeInfo) {
+                       if(pos < 0) {
+                               throw new 
CompositeType.InvalidFieldReferenceException("The " + ((Integer) 
pos).toString() + ". field selected on" +
+                                       " an array, which is an invalid 
index.");
+                       }
+                       checkNotNull(typeInfo, "typeInfo must not be null.");
+
+                       this.pos = pos;
+                       this.fieldType = 
BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType());
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public F get(T record) {
+                       return (F) Array.get(record, pos);
+               }
+
+               @Override
+               public T set(T record, F fieldValue) {
+                       Array.set(record, pos, fieldValue);
+                       return record;
+               }
+       }
+
+       /**
+        * There are two versions of TupleFieldAccessor, differing in whether 
there is an other
+        * FieldAccessor nested inside. The no inner accessor version is 
probably a little faster.
+        */
+       static final class SimpleTupleFieldAccessor<T extends Tuple, F> extends 
FieldAccessor<T, F> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final int pos;
+
+               SimpleTupleFieldAccessor(int pos, TypeInformation<T> typeInfo) {
+                       checkNotNull(typeInfo, "typeInfo must not be null.");
+                       int arity = ((TupleTypeInfo)typeInfo).getArity();
+                       if(pos < 0 || pos >= arity) {
+                               throw new 
CompositeType.InvalidFieldReferenceException(
+                                       "Tried to select " + ((Integer) 
pos).toString() + ". field on \"" +
+                                       typeInfo.toString() + "\", which is an 
invalid index.");
+                       }
+
+                       this.pos = pos;
+                       this.fieldType = 
((TupleTypeInfo)typeInfo).getTypeAt(pos);
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public F get(T record) {
+                       return (F) record.getField(pos);
+               }
+
+               @Override
+               public T set(T record, F fieldValue) {
+                       record.setField(fieldValue, pos);
+                       return record;
+               }
+       }
+
+       /**
+        * @param <T> The Tuple type
+        * @param <R> The field type at the first level
+        * @param <F> The field type at the innermost level
+        */
+       static final class RecursiveTupleFieldAccessor<T extends Tuple, R, F> 
extends FieldAccessor<T, F> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final int pos;
+               private final FieldAccessor<R, F> innerAccessor;
+
+               RecursiveTupleFieldAccessor(int pos, FieldAccessor<R, F> 
innerAccessor, TypeInformation<T> typeInfo) {
+                       checkNotNull(typeInfo, "typeInfo must not be null.");
+                       checkNotNull(innerAccessor, "innerAccessor must not be 
null.");
+
+                       int arity = ((TupleTypeInfo)typeInfo).getArity();
+                       if(pos < 0 || pos >= arity) {
+                               throw new 
CompositeType.InvalidFieldReferenceException(
+                                       "Tried to select " + ((Integer) 
pos).toString() + ". field on \"" +
+                                               typeInfo.toString() + "\", 
which is an invalid index.");
+                       }
+
+                       if(pos < 0) {
+                               throw new 
CompositeType.InvalidFieldReferenceException("Tried to select " + ((Integer) 
pos).toString() + ". field.");
+                       }
+
+                       this.pos = pos;
+                       this.innerAccessor = innerAccessor;
+                       this.fieldType = innerAccessor.fieldType;
+               }
+
+               @Override
+               public F get(T record) {
+                       final R inner = record.getField(pos);
+                       return innerAccessor.get(inner);
+               }
+
+               @Override
+               public T set(T record, F fieldValue) {
+                       final R inner = record.getField(pos);
+                       record.setField(innerAccessor.set(inner, fieldValue), 
pos);
+                       return record;
+               }
+       }
+
+       /**
+        * @param <T> The POJO type
+        * @param <R> The field type at the first level
+        * @param <F> The field type at the innermost level
+        */
+       static final class PojoFieldAccessor<T, R, F> extends FieldAccessor<T, 
F> {
+
+               private static final long serialVersionUID = 1L;
+
+               private transient Field field;
+               private final FieldAccessor<R, F> innerAccessor;
+
+               PojoFieldAccessor(Field field, FieldAccessor<R, F> 
innerAccessor) {
+                       checkNotNull(field, "field must not be null.");
+                       checkNotNull(innerAccessor, "innerAccessor must not be 
null.");
+
+                       this.field = field;
+                       this.innerAccessor = innerAccessor;
+                       this.fieldType = innerAccessor.fieldType;
+               }
+
+               @Override
+               public F get(T pojo) {
+                       try {
+                               @SuppressWarnings("unchecked")
+                               final R inner = (R)field.get(pojo);
+                               return innerAccessor.get(inner);
+                       } catch (IllegalAccessException iaex) {
+                               // The Field class is transient and when 
deserializing its value we also make it accessible
+                               throw new RuntimeException("This should not 
happen since we call setAccesssible(true) in readObject."
+                                               + " fields: " + field + " obj: 
" + pojo);
+                       }
+               }
+
+               @Override
+               public T set(T pojo, F valueToSet) {
+                       try {
+                               @SuppressWarnings("unchecked")
+                               final R inner = (R)field.get(pojo);
+                               field.set(pojo, innerAccessor.set(inner, 
valueToSet));
+                               return pojo;
+                       } catch (IllegalAccessException iaex) {
+                               // The Field class is transient and when 
deserializing its value we also make it accessible
+                               throw new RuntimeException("This should not 
happen since we call setAccesssible(true) in readObject."
+                                               + " fields: " + field + " obj: 
" + pojo);
+                       }
+               }
+
+               private void writeObject(ObjectOutputStream out)
+                               throws IOException, ClassNotFoundException {
+                       out.defaultWriteObject();
+                       FieldSerializer.serializeField(field, out);
+               }
+
+               private void readObject(ObjectInputStream in)
+                               throws IOException, ClassNotFoundException {
+                       in.defaultReadObject();
+                       field = FieldSerializer.deserializeField(in);
+               }
+       }
+
+       /**
+        * There are two versions of ProductFieldAccessor, differing in whether 
there is an other
+        * FieldAccessor nested inside. The no inner accessor version is 
probably a little faster.
+        */
+       static final class SimpleProductFieldAccessor<T, F> extends 
FieldAccessor<T, F> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final int pos;
+               private final TupleSerializerBase<T> serializer;
+               private final Object[] fields;
+               private final int length;
+
+               SimpleProductFieldAccessor(int pos, TypeInformation<T> 
typeInfo, ExecutionConfig config) {
+                       checkNotNull(typeInfo, "typeInfo must not be null.");
+                       int arity = ((TupleTypeInfoBase)typeInfo).getArity();
+                       if(pos < 0 || pos >= arity) {
+                               throw new 
CompositeType.InvalidFieldReferenceException(
+                                       "Tried to select " + ((Integer) 
pos).toString() + ". field on \"" +
+                                               typeInfo.toString() + "\", 
which is an invalid index.");
+                       }
+
+                       this.pos = pos;
+                       this.fieldType = 
((TupleTypeInfoBase<T>)typeInfo).getTypeAt(pos);
+                       this.serializer = 
(TupleSerializerBase<T>)typeInfo.createSerializer(config);
+                       this.length = this.serializer.getArity();
+                       this.fields = new Object[this.length];
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public F get(T record) {
+                       Product prod = (Product)record;
+                       return (F) prod.productElement(pos);
+               }
+
+               @Override
+               public T set(T record, F fieldValue) {
+                       Product prod = (Product)record;
+                       for (int i = 0; i < length; i++) {
+                               fields[i] = prod.productElement(i);
+                       }
+                       fields[pos] = fieldValue;
+                       return serializer.createInstance(fields);
+               }
+       }
+
+
+       static final class RecursiveProductFieldAccessor<T, R, F> extends 
FieldAccessor<T, F> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final int pos;
+               private final TupleSerializerBase<T> serializer;
+               private final Object[] fields;
+               private final int length;
+               private final FieldAccessor<R, F> innerAccessor;
+
+               RecursiveProductFieldAccessor(int pos, TypeInformation<T> 
typeInfo, FieldAccessor<R, F> innerAccessor, ExecutionConfig config) {
+                       int arity = ((TupleTypeInfoBase)typeInfo).getArity();
+                       if(pos < 0 || pos >= arity) {
+                               throw new 
CompositeType.InvalidFieldReferenceException(
+                                       "Tried to select " + ((Integer) 
pos).toString() + ". field on \"" +
+                                               typeInfo.toString() + "\", 
which is an invalid index.");
+                       }
+                       checkNotNull(typeInfo, "typeInfo must not be null.");
+                       checkNotNull(innerAccessor, "innerAccessor must not be 
null.");
+
+                       this.pos = pos;
+                       this.fieldType = 
((TupleTypeInfoBase<T>)typeInfo).getTypeAt(pos);
+                       this.serializer = 
(TupleSerializerBase<T>)typeInfo.createSerializer(config);
+                       this.length = this.serializer.getArity();
+                       this.fields = new Object[this.length];
+                       this.innerAccessor = innerAccessor;
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public F get(T record) {
+                       return 
innerAccessor.get((R)((Product)record).productElement(pos));
+               }
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public T set(T record, F fieldValue) {
+                       Product prod = (Product) record;
+                       for (int i = 0; i < length; i++) {
+                               fields[i] = prod.productElement(i);
+                       }
+                       fields[pos] = innerAccessor.set((R)fields[pos], 
fieldValue);
+                       return serializer.createInstance(fields);
+               }
+       }
+}

Reply via email to