[FLINK-3702] FieldAccessor refactor to static factory Closes #2094
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/870e219d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/870e219d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/870e219d Branch: refs/heads/master Commit: 870e219d92809df76c843906e19c7c0606529f11 Parents: 1f04542 Author: Marton Balassi <[email protected]> Authored: Thu Nov 3 16:17:46 2016 +0100 Committer: Marton Balassi <[email protected]> Committed: Thu Nov 24 22:22:55 2016 +0100 ---------------------------------------------------------------------- .../api/common/typeinfo/BasicArrayTypeInfo.java | 18 - .../api/common/typeinfo/BasicTypeInfo.java | 26 -- .../InvalidFieldReferenceException.java | 31 -- .../common/typeinfo/PrimitiveArrayTypeInfo.java | 18 - .../api/common/typeinfo/TypeInformation.java | 34 -- .../api/common/typeutils/CompositeType.java | 10 + .../flink/api/java/typeutils/FieldAccessor.java | 324 ---------------- .../flink/api/java/typeutils/PojoTypeInfo.java | 37 +- .../api/java/typeutils/TupleTypeInfoBase.java | 32 +- .../api/java/typeutils/FieldAccessorTest.java | 343 ----------------- .../api/java/functions/SemanticPropUtil.java | 2 +- .../flink/api/java/operator/DataSinkTest.java | 5 +- .../operator/FullOuterJoinOperatorTest.java | 3 +- .../operator/LeftOuterJoinOperatorTest.java | 3 +- .../operator/RightOuterJoinOperatorTest.java | 3 +- .../scala/typeutils/ProductFieldAccessor.java | 75 ---- .../api/scala/typeutils/CaseClassTypeInfo.scala | 31 +- .../scala/typeutils/CaseClassTypeInfoTest.scala | 110 ------ .../streaming/api/datastream/KeyedStream.java | 65 ++-- .../aggregation/ComparableAggregator.java | 7 +- .../functions/aggregation/SumAggregator.java | 7 +- .../streaming/util/typeutils/FieldAccessor.java | 382 +++++++++++++++++++ .../util/typeutils/FieldAccessorFactory.java | 242 ++++++++++++ .../util/typeutils/FieldAccessorTest.java | 358 +++++++++++++++++ flink-streaming-scala/pom.xml | 8 + .../flink/streaming/api/scala/KeyedStream.scala | 49 ++- .../api/scala/CaseClassFieldAccessorTest.scala | 137 +++++++ .../streaming/runtime/DataStreamPojoITCase.java | 4 +- 28 files changed, 1209 insertions(+), 1155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java index d04e7d9..25b2850 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java @@ -29,7 +29,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeutils.base.GenericArraySerializer; -import org.apache.flink.api.java.typeutils.FieldAccessor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -122,23 +121,6 @@ public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> { } @Override - @PublicEvolving - public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) { - return new FieldAccessor.ArrayFieldAccessor<>(pos, this); - } - - @Override - @PublicEvolving - public <F> FieldAccessor<T, F> getFieldAccessor(String pos, ExecutionConfig config) { - try { - return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(pos), this); - } catch (NumberFormatException ex) { - throw new InvalidFieldReferenceException - ("A field expression on an array must be an integer index (that might be given as a string)."); - } - } - - @Override public boolean equals(Object obj) { if (obj instanceof BasicArrayTypeInfo) { BasicArrayTypeInfo<?, ?> other = (BasicArrayTypeInfo<?, ?>) obj; http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java index 09efba6..e2fd74e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java @@ -58,7 +58,6 @@ import org.apache.flink.api.common.typeutils.base.ShortSerializer; import org.apache.flink.api.common.typeutils.base.StringComparator; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; -import org.apache.flink.api.java.typeutils.FieldAccessor; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -172,31 +171,6 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T } } - @Override - @PublicEvolving - @SuppressWarnings("unchecked") - public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) { - if(pos != 0) { - throw new InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " + - "basic type (" + this.toString() + "). A field expression on a basic type can only select " + - "the 0th field (which means selecting the entire basic type)."); - } - return (FieldAccessor<T, F>) new FieldAccessor.SimpleFieldAccessor<T>(this); - } - - @Override - @PublicEvolving - public <F> FieldAccessor<T, F> getFieldAccessor(String field, ExecutionConfig config) { - try { - int pos = field.equals("*") ? 0 : Integer.parseInt(field); - return getFieldAccessor(pos, config); - } catch (NumberFormatException ex) { - throw new InvalidFieldReferenceException("You tried to select the field \"" + field + - "\" on a " + this.toString() + ". A field expression on a basic type can only be \"*\" or \"0\"" + - " (both of which mean selecting the entire basic type)."); - } - } - // -------------------------------------------------------------------------------------------- @Override http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java deleted file mode 100644 index 3c67c46..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.typeinfo; - -import org.apache.flink.annotation.PublicEvolving; - -@PublicEvolving -public class InvalidFieldReferenceException extends IllegalArgumentException { - - private static final long serialVersionUID = 1L; - - public InvalidFieldReferenceException(String s) { - super(s); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java index 2bd96d3..1c6ce00 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java @@ -40,7 +40,6 @@ import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerial import org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator; import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer; -import org.apache.flink.api.java.typeutils.FieldAccessor; import java.util.HashMap; import java.util.Map; @@ -139,23 +138,6 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato return this.serializer; } - @Override - @PublicEvolving - public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) { - return new FieldAccessor.ArrayFieldAccessor<>(pos, this); - } - - @Override - @PublicEvolving - public <F> FieldAccessor<T, F> getFieldAccessor(String pos, ExecutionConfig config) { - try { - return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(pos), this); - } catch (NumberFormatException ex) { - throw new InvalidFieldReferenceException - ("A field expression on an array must be an integer index (that might be given as a string)."); - } - } - /** * Gets the class that represents the component type. * @return The class of the component type. http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java index 7be2b68..154ceb1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java @@ -24,7 +24,6 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.FieldAccessor; import org.apache.flink.api.java.typeutils.TypeExtractor; import java.io.Serializable; @@ -173,39 +172,6 @@ public abstract class TypeInformation<T> implements Serializable { @PublicEvolving public abstract TypeSerializer<T> createSerializer(ExecutionConfig config); - - /** - * Creates a {@link FieldAccessor} for the given field position, which can be used to get and set - * the specified field on instances of this type. - * - * @param pos The field position (zero-based) - * @param config Configuration object - * @param <F> The type of the field to access - * @return The created FieldAccessor - */ - @PublicEvolving - public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config){ - throw new InvalidFieldReferenceException("Cannot reference field by position on " + this.toString() - + "Referencing a field by position is supported on tuples, case classes, and arrays. " - + "Additionally, you can select the 0th field of a primitive/basic type (e.g. int)."); - } - - /** - * Creates a {@link FieldAccessor} for the field that is given by a field expression, - * which can be used to get and set the specified field on instances of this type. - * - * @param field The field expression - * @param config Configuration object - * @param <F> The type of the field to access - * @return The created FieldAccessor - */ - @PublicEvolving - public <F> FieldAccessor<T, F> getFieldAccessor(String field, ExecutionConfig config) { - throw new InvalidFieldReferenceException("Cannot reference field by field expression on " + this.toString() - + "Field expressions are only supported on POJO types, tuples, and case classes. " - + "(See the Flink documentation on what is considered a POJO.)"); - } - @Override public abstract String toString(); http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java index a4230f4..4bf17ea 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java @@ -265,6 +265,16 @@ public abstract class CompositeType<T> extends TypeInformation<T> { @PublicEvolving public abstract int getFieldIndex(String fieldName); + @PublicEvolving + public static class InvalidFieldReferenceException extends IllegalArgumentException { + + private static final long serialVersionUID = 1L; + + public InvalidFieldReferenceException(String s) { + super(s); + } + } + @Override public boolean equals(Object obj) { if (obj instanceof CompositeType) { http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java deleted file mode 100644 index 97ef31a..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.operators.Keys; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.lang.reflect.Array; -import java.lang.reflect.Field; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.apache.flink.util.Preconditions.checkNotNull; - - -/** - * These classes encapsulate the logic of accessing a field specified by the user as either an index - * or a field expression string. TypeInformation can also be requested for the field. - * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). - * - * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. - * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". - * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) - */ -@PublicEvolving -public abstract class FieldAccessor<T, F> implements Serializable { - - private static final long serialVersionUID = 1L; - - protected TypeInformation fieldType; - - /** - * Gets the TypeInformation for the type of the field. - * Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). - */ - @SuppressWarnings("unchecked") - public TypeInformation<F> getFieldType() { - return fieldType; - } - - - /** - * Gets the value of the field (specified in the constructor) of the given record. - * @param record The record on which the field will be accessed - * @return The value of the field. - */ - public abstract F get(T record); - - /** - * Sets the field (specified in the constructor) of the given record to the given value. - * - * Warning: This might modify the original object, or might return a new object instance. - * (This is necessary, because the record might be immutable.) - * - * @param record The record to modify - * @param fieldValue The new value of the field - * @return A record that has the given field value. (this might be a new instance or the original) - */ - public abstract T set(T record, F fieldValue); - - - // -------------------------------------------------------------------------------------------------- - - - /** - * This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a - * field of a POJO that is itself some composite type but is not further decomposed) - */ - public final static class SimpleFieldAccessor<T> extends FieldAccessor<T, T> { - - private static final long serialVersionUID = 1L; - - public SimpleFieldAccessor(TypeInformation<T> typeInfo) { - checkNotNull(typeInfo, "typeInfo must not be null."); - - this.fieldType = typeInfo; - } - - @Override - public T get(T record) { - return record; - } - - @Override - public T set(T record, T fieldValue) { - return fieldValue; - } - } - - public final static class ArrayFieldAccessor<T, F> extends FieldAccessor<T, F> { - - private static final long serialVersionUID = 1L; - - private final int pos; - - public ArrayFieldAccessor(int pos, TypeInformation typeInfo) { - if(pos < 0) { - throw new InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on" + - " an array, which is an invalid index."); - } - checkNotNull(typeInfo, "typeInfo must not be null."); - - this.pos = pos; - this.fieldType = BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType()); - } - - @SuppressWarnings("unchecked") - @Override - public F get(T record) { - return (F) Array.get(record, pos); - } - - @Override - public T set(T record, F fieldValue) { - Array.set(record, pos, fieldValue); - return record; - } - } - - /** - * There are two versions of TupleFieldAccessor, differing in whether there is an other - * FieldAccessor nested inside. The no inner accessor version is probably a little faster. - */ - static final class SimpleTupleFieldAccessor<T, F> extends FieldAccessor<T, F> { - - private static final long serialVersionUID = 1L; - - private final int pos; - - SimpleTupleFieldAccessor(int pos, TypeInformation<T> typeInfo) { - int arity = ((TupleTypeInfo)typeInfo).getArity(); - if(pos < 0 || pos >= arity) { - throw new InvalidFieldReferenceException( - "Tried to select " + ((Integer) pos).toString() + ". field on \"" + - typeInfo.toString() + "\", which is an invalid index."); - } - checkNotNull(typeInfo, "typeInfo must not be null."); - - this.pos = pos; - this.fieldType = ((TupleTypeInfo)typeInfo).getTypeAt(pos); - } - - @SuppressWarnings("unchecked") - @Override - public F get(T record) { - final Tuple tuple = (Tuple) record; - return (F) tuple.getField(pos); - } - - @Override - public T set(T record, F fieldValue) { - final Tuple tuple = (Tuple) record; - tuple.setField(fieldValue, pos); - return record; - } - } - - /** - * @param <T> The Tuple type - * @param <R> The field type at the first level - * @param <F> The field type at the innermost level - */ - static final class RecursiveTupleFieldAccessor<T, R, F> extends FieldAccessor<T, F> { - - private static final long serialVersionUID = 1L; - - private final int pos; - private final FieldAccessor<R, F> innerAccessor; - - RecursiveTupleFieldAccessor(int pos, FieldAccessor<R, F> innerAccessor) { - if(pos < 0) { - throw new InvalidFieldReferenceException("Tried to select " + ((Integer) pos).toString() + ". field."); - } - checkNotNull(innerAccessor, "innerAccessor must not be null."); - - this.pos = pos; - this.innerAccessor = innerAccessor; - this.fieldType = innerAccessor.fieldType; - } - - @Override - public F get(T record) { - final Tuple tuple = (Tuple) record; - final R inner = tuple.getField(pos); - return innerAccessor.get(inner); - } - - @Override - public T set(T record, F fieldValue) { - final Tuple tuple = (Tuple) record; - final R inner = tuple.getField(pos); - tuple.setField(innerAccessor.set(inner, fieldValue), pos); - return record; - } - } - - /** - * @param <T> The POJO type - * @param <R> The field type at the first level - * @param <F> The field type at the innermost level - */ - static final class PojoFieldAccessor<T, R, F> extends FieldAccessor<T, F> { - - private static final long serialVersionUID = 1L; - - private transient Field field; - private final FieldAccessor<R, F> innerAccessor; - - PojoFieldAccessor(Field field, FieldAccessor<R, F> innerAccessor) { - checkNotNull(field, "field must not be null."); - checkNotNull(innerAccessor, "innerAccessor must not be null."); - - this.field = field; - this.innerAccessor = innerAccessor; - this.fieldType = innerAccessor.fieldType; - } - - @Override - public F get(T pojo) { - try { - @SuppressWarnings("unchecked") - final R inner = (R)field.get(pojo); - return innerAccessor.get(inner); - } catch (IllegalAccessException iaex) { - throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject." - + " fields: " + field + " obj: " + pojo); - } - } - - @Override - public T set(T pojo, F valueToSet) { - try { - @SuppressWarnings("unchecked") - final R inner = (R)field.get(pojo); - field.set(pojo, innerAccessor.set(inner, valueToSet)); - return pojo; - } catch (IllegalAccessException iaex) { - throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject." - + " fields: " + field + " obj: " + pojo); - } - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - FieldSerializer.serializeField(field, out); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - field = FieldSerializer.deserializeField(in); - } - } - - - // -------------------------------------------------------------------------------------------------- - - private final static String REGEX_FIELD = "[\\p{L}\\p{Digit}_\\$]*"; // This can start with a digit (because of Tuples) - private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?"; - private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS - +"|\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR - +"|\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA; - - private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD); - - public static FieldExpression decomposeFieldExpression(String fieldExpression) { - Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression); - if(!matcher.matches()) { - throw new InvalidFieldReferenceException("Invalid field expression \""+fieldExpression+"\"."); - } - - String head = matcher.group(0); - if(head.equals(Keys.ExpressionKeys.SELECT_ALL_CHAR) || head.equals(Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA)) { - throw new InvalidFieldReferenceException("No wildcards are allowed here."); - } else { - head = matcher.group(1); - } - - String tail = matcher.group(3); - - return new FieldExpression(head, tail); - } - - /** - * Represents a decomposition of a field expression into its first part, and the rest. - * E.g. "foo.f1.bar" is decomposed into "foo" and "f1.bar". - */ - public static class FieldExpression implements Serializable { - - private static final long serialVersionUID = 1L; - - public String head, tail; // tail can be null, if the field expression had just one part - - FieldExpression(String head, String tail) { - this.head = head; - this.tail = tail; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java index 72432d6..8a4fbbe 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java @@ -23,7 +23,6 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.Keys.ExpressionKeys; -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeComparator; @@ -133,7 +132,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> { // gives only some undefined order. return false; } - + @Override @PublicEvolving @@ -319,39 +318,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> { return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config); } - - @Override - @PublicEvolving - public <F> FieldAccessor<T, F> getFieldAccessor(String fieldExpression, ExecutionConfig config) { - - FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); - - // get field - PojoField field = null; - TypeInformation<?> fieldType = null; - for (int i = 0; i < fields.length; i++) { - if (fields[i].getField().getName().equals(decomp.head)) { - field = fields[i]; - fieldType = fields[i].getTypeInformation(); - break; - } - } - if (field == null) { - throw new InvalidFieldReferenceException("Unable to find field \""+decomp.head+"\" in type "+this+"."); - } - - if(decomp.tail == null) { - @SuppressWarnings("unchecked") - FieldAccessor<F,F> innerAccessor = new FieldAccessor.SimpleFieldAccessor<F>((TypeInformation<F>) fieldType); - return new FieldAccessor.PojoFieldAccessor<T, F, F>(field.getField(), innerAccessor); - } else { - @SuppressWarnings("unchecked") - FieldAccessor<Object,F> innerAccessor = - (FieldAccessor<Object,F>)fieldType.<F>getFieldAccessor(decomp.tail, config); - return new FieldAccessor.PojoFieldAccessor<T, Object, F>(field.getField(), innerAccessor); - } - } - + @Override public boolean equals(Object obj) { if (obj instanceof PojoTypeInfo) { http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java index c9a55fc..807fd54 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java @@ -23,10 +23,7 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.Keys.ExpressionKeys; -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; @@ -206,34 +203,7 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> { TypeInformation<X> typed = (TypeInformation<X>) this.types[pos]; return typed; } - - @Override - @PublicEvolving - public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) { - return new FieldAccessor.SimpleTupleFieldAccessor<T, F>(pos, this); - } - - @Override - @PublicEvolving - public <F> FieldAccessor<T, F> getFieldAccessor(String fieldExpression, ExecutionConfig config) { - FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(fieldExpression); - int fieldPos = this.getFieldIndex(decomp.head); - if (fieldPos == -1) { - try { - fieldPos = Integer.parseInt(decomp.head); - } catch (NumberFormatException ex) { - throw new InvalidFieldReferenceException("Tried to select field \"" + decomp.head - + "\" on " + this.toString()); - } - } - if (decomp.tail == null) { - return new FieldAccessor.SimpleTupleFieldAccessor<T, F>(fieldPos, this); - } else { - FieldAccessor<?, F> innerAccessor = getTypeAt(fieldPos).getFieldAccessor(decomp.tail, config); - return new FieldAccessor.RecursiveTupleFieldAccessor<>(fieldPos, innerAccessor); - } - } - + @Override public boolean equals(Object obj) { if (obj instanceof TupleTypeInfoBase) { http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java deleted file mode 100644 index f780447..0000000 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.typeutils; - -import static org.junit.Assert.*; - -import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.junit.Test; - -public class FieldAccessorTest { - - // Note, that AggregationFunctionTest indirectly also tests FieldAccessors. - // ProductFieldAccessor is tested in CaseClassTypeInfoTest. - - @Test - public void testFlatTuple() { - Tuple2<String, Integer> t = Tuple2.of("aa", 5); - TupleTypeInfo<Tuple2<String, Integer>> tpeInfo = - (TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(t); - - FieldAccessor<Tuple2<String, Integer>, String> f0 = tpeInfo.getFieldAccessor("f0", null); - assertEquals("aa", f0.get(t)); - assertEquals("aa", t.f0); - t = f0.set(t, "b"); - assertEquals("b", f0.get(t)); - assertEquals("b", t.f0); - - FieldAccessor<Tuple2<String, Integer>, Integer> f1 = tpeInfo.getFieldAccessor("f1", null); - assertEquals(5, (int) f1.get(t)); - assertEquals(5, (int) t.f1); - t = f1.set(t, 7); - assertEquals(7, (int) f1.get(t)); - assertEquals(7, (int) t.f1); - assertEquals("b", f0.get(t)); - assertEquals("b", t.f0); - - - FieldAccessor<Tuple2<String, Integer>, Integer> f1n = tpeInfo.getFieldAccessor(1, null); - assertEquals(7, (int) f1n.get(t)); - assertEquals(7, (int) t.f1); - t = f1n.set(t, 10); - assertEquals(10, (int) f1n.get(t)); - assertEquals(10, (int) f1.get(t)); - assertEquals(10, (int) t.f1); - assertEquals("b", f0.get(t)); - assertEquals("b", t.f0); - - FieldAccessor<Tuple2<String, Integer>, Integer> f1ns = tpeInfo.getFieldAccessor("1", null); - assertEquals(10, (int) f1ns.get(t)); - assertEquals(10, (int) t.f1); - t = f1ns.set(t, 11); - assertEquals(11, (int) f1ns.get(t)); - assertEquals(11, (int) f1.get(t)); - assertEquals(11, (int) t.f1); - assertEquals("b", f0.get(t)); - assertEquals("b", t.f0); - - // This is technically valid (the ".0" is selecting the 0th field of a basic type). - FieldAccessor<Tuple2<String, Integer>, String> f0_0 = tpeInfo.getFieldAccessor("f0.0", null); - assertEquals("b", f0_0.get(t)); - assertEquals("b", t.f0); - t = f0_0.set(t, "cc"); - assertEquals("cc", f0_0.get(t)); - assertEquals("cc", t.f0); - - try { - FieldAccessor<Tuple2<String, Integer>, String> bad = tpeInfo.getFieldAccessor("almafa", null); - assertFalse("Expected exception, because of bad field name", false); - } catch (InvalidFieldReferenceException ex) { - // OK - } - } - - @Test - public void testTupleInTuple() { - Tuple2<String, Tuple3<Integer, Long, Double>> t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0)); - TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>> tpeInfo = - (TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>>)TypeExtractor.getForObject(t); - - FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, String> f0 = tpeInfo.getFieldAccessor("f0", null); - assertEquals("aa", f0.get(t)); - assertEquals("aa", t.f0); - - FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null); - assertEquals(2.0, f1f2.get(t), 0); - assertEquals(2.0, t.f1.f2, 0); - t = f1f2.set(t, 3.0); - assertEquals(3.0, f1f2.get(t), 0); - assertEquals(3.0, t.f1.f2, 0); - assertEquals("aa", f0.get(t)); - assertEquals("aa", t.f0); - - FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, Tuple3<Integer, Long, Double>> f1 = tpeInfo.getFieldAccessor("f1", null); - assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t)); - assertEquals(Tuple3.of(5, 9L, 3.0), t.f1); - t = f1.set(t, Tuple3.of(8, 12L, 4.0)); - assertEquals(Tuple3.of(8, 12L, 4.0), f1.get(t)); - assertEquals(Tuple3.of(8, 12L, 4.0), t.f1); - assertEquals("aa", f0.get(t)); - assertEquals("aa", t.f0); - - FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, Tuple3<Integer, Long, Double>> f1n = tpeInfo.getFieldAccessor(1, null); - assertEquals(Tuple3.of(8, 12L, 4.0), f1n.get(t)); - assertEquals(Tuple3.of(8, 12L, 4.0), t.f1); - t = f1n.set(t, Tuple3.of(10, 13L, 5.0)); - assertEquals(Tuple3.of(10, 13L, 5.0), f1n.get(t)); - assertEquals(Tuple3.of(10, 13L, 5.0), f1.get(t)); - assertEquals(Tuple3.of(10, 13L, 5.0), t.f1); - assertEquals("aa", f0.get(t)); - assertEquals("aa", t.f0); - } - - @Test - @SuppressWarnings("unchecked") - public void testTupleFieldAccessorOutOfBounds() { - try { - TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class).getFieldAccessor(2, null); - fail(); - } catch (InvalidFieldReferenceException e) { - // Nothing to do here - } - } - - public static class Foo { - public int x; - public Tuple2<String, Long> t; - public Short y; - - public Foo() {} - - public Foo(int x, Tuple2<String, Long> t, Short y) { - this.x = x; - this.t = t; - this.y = y; - } - } - - @Test - public void testTupleInPojoInTuple() { - Tuple2<String, Foo> t = Tuple2.of("aa", new Foo(8, Tuple2.of("ddd", 9L), (short) 2)); - TupleTypeInfo<Tuple2<String, Foo>> tpeInfo = - (TupleTypeInfo<Tuple2<String, Foo>>)TypeExtractor.getForObject(t); - - FieldAccessor<Tuple2<String, Foo>, Long> f1tf1 = tpeInfo.getFieldAccessor("f1.t.f1", null); - assertEquals(9L, (long) f1tf1.get(t)); - assertEquals(9L, (long) t.f1.t.f1); - t = f1tf1.set(t, 12L); - assertEquals(12L, (long) f1tf1.get(t)); - assertEquals(12L, (long) t.f1.t.f1); - - FieldAccessor<Tuple2<String, Foo>, String> f1tf0 = tpeInfo.getFieldAccessor("f1.t.f0", null); - assertEquals("ddd", f1tf0.get(t)); - assertEquals("ddd", t.f1.t.f0); - t = f1tf0.set(t, "alma"); - assertEquals("alma", f1tf0.get(t)); - assertEquals("alma", t.f1.t.f0); - - FieldAccessor<Tuple2<String, Foo>, Foo> f1 = tpeInfo.getFieldAccessor("f1", null); - FieldAccessor<Tuple2<String, Foo>, Foo> f1n = tpeInfo.getFieldAccessor(1, null); - assertEquals(Tuple2.of("alma", 12L), f1.get(t).t); - assertEquals(Tuple2.of("alma", 12L), f1n.get(t).t); - assertEquals(Tuple2.of("alma", 12L), t.f1.t); - Foo newFoo = new Foo(8, Tuple2.of("ddd", 9L), (short) 2); - f1.set(t, newFoo); - assertEquals(newFoo, f1.get(t)); - assertEquals(newFoo, f1n.get(t)); - assertEquals(newFoo, t.f1); - } - - - public static class Inner { - public long x; - public boolean b; - - public Inner(){} - - public Inner(long x) { - this.x = x; - } - - public Inner(long x, boolean b) { - this.x = x; - this.b = b; - } - - @Override - public String toString() { - return ((Long)x).toString() + ", " + b; - } - } - - public static class Outer { - public int a; - public Inner i; - public short b; - - public Outer(){} - - public Outer(int a, Inner i, short b) { - this.a = a; - this.i = i; - this.b = b; - } - - @Override - public String toString() { - return a+", "+i.toString()+", "+b; - } - } - - @Test - public void testPojoInPojo() { - Outer o = new Outer(10, new Inner(4L), (short)12); - PojoTypeInfo<Outer> tpeInfo = (PojoTypeInfo<Outer>)TypeInformation.of(Outer.class); - - FieldAccessor<Outer, Long> fix = tpeInfo.getFieldAccessor("i.x", null); - assertEquals(4L, (long) fix.get(o)); - assertEquals(4L, o.i.x); - o = fix.set(o, 22L); - assertEquals(22L, (long) fix.get(o)); - assertEquals(22L, o.i.x); - - FieldAccessor<Outer, Inner> fi = tpeInfo.getFieldAccessor("i", null); - assertEquals(22L, fi.get(o).x); - assertEquals(22L, (long) fix.get(o)); - assertEquals(22L, o.i.x); - o = fi.set(o, new Inner(30L)); - assertEquals(30L, fi.get(o).x); - assertEquals(30L, (long) fix.get(o)); - assertEquals(30L, o.i.x); - } - - @Test - @SuppressWarnings("unchecked") - public void testArray() { - int[] a = new int[]{3,5}; - FieldAccessor<int[], Integer> fieldAccessor = - (FieldAccessor<int[], Integer>) (Object) - PrimitiveArrayTypeInfo.getInfoFor(a.getClass()).getFieldAccessor(1, null); - - assertEquals(Integer.class, fieldAccessor.getFieldType().getTypeClass()); - - assertEquals((Integer)a[1], fieldAccessor.get(a)); - - a = fieldAccessor.set(a, 6); - assertEquals((Integer)a[1], fieldAccessor.get(a)); - - - - Integer[] b = new Integer[]{3,5}; - FieldAccessor<Integer[], Integer> fieldAccessor2 = - (FieldAccessor<Integer[], Integer>) (Object) - BasicArrayTypeInfo.getInfoFor(b.getClass()).getFieldAccessor(1, null); - - assertEquals(Integer.class, fieldAccessor2.getFieldType().getTypeClass()); - - assertEquals(b[1], fieldAccessor2.get(b)); - - b = fieldAccessor2.set(b, 6); - assertEquals(b[1], fieldAccessor2.get(b)); - } - - public static class ArrayInPojo { - public long x; - public int[] arr; - public int y; - - public ArrayInPojo() {} - - public ArrayInPojo(long x, int[] arr, int y) { - this.x = x; - this.arr = arr; - this.y = y; - } - } - - @Test - public void testArrayInPojo() { - ArrayInPojo o = new ArrayInPojo(10L, new int[]{3,4,5}, 12); - PojoTypeInfo<ArrayInPojo> tpeInfo = (PojoTypeInfo<ArrayInPojo>)TypeInformation.of(ArrayInPojo.class); - - FieldAccessor<ArrayInPojo, Integer> fix = tpeInfo.getFieldAccessor("arr.1", null); - assertEquals(4, (int) fix.get(o)); - assertEquals(4L, o.arr[1]); - o = fix.set(o, 8); - assertEquals(8, (int) fix.get(o)); - assertEquals(8, o.arr[1]); - } - - @Test - public void testBasicType() { - Long x = 7L; - TypeInformation<Long> tpeInfo = BasicTypeInfo.LONG_TYPE_INFO; - - try { - FieldAccessor<Long, Long> f = tpeInfo.getFieldAccessor(1, null); - assertFalse("Expected exception, because not the 0th field selected for a basic type.", false); - } catch (InvalidFieldReferenceException ex) { - // OK - } - - try { - FieldAccessor<Long, Long> f = tpeInfo.getFieldAccessor("foo", null); - assertFalse("Expected exception, because not the 0th field selected for a basic type.", false); - } catch (InvalidFieldReferenceException ex) { - // OK - } - - FieldAccessor<Long, Long> f = tpeInfo.getFieldAccessor(0, null); - assertEquals(7L, (long) f.get(x)); - x = f.set(x, 12L); - assertEquals(12L, (long) f.get(x)); - assertEquals(12L, (long) x); - - FieldAccessor<Long, Long> f2 = tpeInfo.getFieldAccessor("*", null); - assertEquals(12L, (long) f2.get(x)); - x = f2.set(x, 14L); - assertEquals(14L, (long) f2.get(x)); - assertEquals(14L, (long) x); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java index 4a0b2fc..aedba15 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java @@ -27,8 +27,8 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties; import org.apache.flink.api.common.operators.util.FieldSet; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException; import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java index 0da417b..0493583 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java @@ -20,7 +20,6 @@ package org.apache.flink.api.java.operator; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -167,7 +166,7 @@ public class DataSinkTest { .sortLocalOutput(5, Order.DESCENDING); } - @Test(expected = InvalidFieldReferenceException.class) + @Test(expected = CompositeType.InvalidFieldReferenceException.class) public void testFailTupleInv() { final ExecutionEnvironment env = ExecutionEnvironment @@ -285,7 +284,7 @@ public class DataSinkTest { .sortLocalOutput(1, Order.DESCENDING); } - @Test(expected = InvalidFieldReferenceException.class) + @Test(expected = CompositeType.InvalidFieldReferenceException.class) public void testFailPojoInvalidField() { final ExecutionEnvironment env = ExecutionEnvironment http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java index 9f5cfb2..9f2aa41 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -136,7 +135,7 @@ public class FullOuterJoinOperatorTest { .with(new DummyJoin()); } - @Test(expected = InvalidFieldReferenceException.class) + @Test(expected = CompositeType.InvalidFieldReferenceException.class) public void testFullOuter8() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java index 914c75c..bfcc3e8 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -137,7 +136,7 @@ public class LeftOuterJoinOperatorTest { .with(new DummyJoin()); } - @Test(expected = InvalidFieldReferenceException.class) + @Test(expected = CompositeType.InvalidFieldReferenceException.class) public void testLeftOuter8() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java index f5d8129..709d830 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -136,7 +135,7 @@ public class RightOuterJoinOperatorTest { .with(new DummyJoin()); } - @Test(expected = InvalidFieldReferenceException.class) + @Test(expected = CompositeType.InvalidFieldReferenceException.class) public void testRightOuter8() { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo); http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java deleted file mode 100644 index 0be6f33..0000000 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.typeutils; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.FieldAccessor; -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; -import scala.Product; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -public final class ProductFieldAccessor<T, R, F> extends FieldAccessor<T, F> { - - private static final long serialVersionUID = 1L; - - private final int pos; - private final TupleSerializerBase<T> serializer; - private final Object[] fields; - private final int length; - private final FieldAccessor<R, F> innerAccessor; - - ProductFieldAccessor(int pos, TypeInformation<T> typeInfo, FieldAccessor<R, F> innerAccessor, ExecutionConfig config) { - int arity = ((TupleTypeInfoBase)typeInfo).getArity(); - if(pos < 0 || pos >= arity) { - throw new InvalidFieldReferenceException( - "Tried to select " + ((Integer) pos).toString() + ". field on \"" + - typeInfo.toString() + "\", which is an invalid index."); - } - checkNotNull(typeInfo, "typeInfo must not be null."); - checkNotNull(innerAccessor, "innerAccessor must not be null."); - - this.pos = pos; - this.fieldType = ((TupleTypeInfoBase<T>)typeInfo).getTypeAt(pos); - this.serializer = (TupleSerializerBase<T>)typeInfo.createSerializer(config); - this.length = this.serializer.getArity(); - this.fields = new Object[this.length]; - this.innerAccessor = innerAccessor; - } - - @SuppressWarnings("unchecked") - @Override - public F get(T record) { - return innerAccessor.get((R)((Product)record).productElement(pos)); - } - - @SuppressWarnings("unchecked") - @Override - public T set(T record, F fieldValue) { - Product prod = (Product)record; - for (int i = 0; i < length; i++) { - fields[i] = prod.productElement(i); - } - fields[pos] = innerAccessor.set((R)fields[pos], fieldValue); - return serializer.createInstance(fields); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala index d970dfd..70ca412 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala @@ -25,12 +25,10 @@ import org.apache.flink.annotation.{Public, PublicEvolving} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.operators.Keys import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, TypeComparatorBuilder} +import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, InvalidFieldReferenceException, TypeComparatorBuilder} import org.apache.flink.api.common.typeutils._ import Keys.ExpressionKeys -import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException -import org.apache.flink.api.java.typeutils.FieldAccessor.SimpleFieldAccessor -import org.apache.flink.api.java.typeutils.{FieldAccessor, TupleTypeInfoBase} +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -239,31 +237,6 @@ abstract class CaseClassTypeInfo[T <: Product]( } } - override def getFieldAccessor[F](pos: Int, config: ExecutionConfig): FieldAccessor[T, F] = { - new ProductFieldAccessor[T,F,F]( - pos, this, new SimpleFieldAccessor[F](types(pos).asInstanceOf[TypeInformation[F]]), config) - } - - override def getFieldAccessor[F](fieldExpression: String, config: ExecutionConfig): - FieldAccessor[T, F] = { - - val decomp = FieldAccessor.decomposeFieldExpression(fieldExpression) - - val pos = getFieldIndex(decomp.head) - if(pos < 0) { - throw new InvalidFieldReferenceException("Invalid field selected: " + fieldExpression) - } - val fieldType = types(pos) - - if (decomp.tail == null) { - getFieldAccessor(pos, config) - } else { - val innerAccessor = - fieldType.getFieldAccessor[F](decomp.tail, config).asInstanceOf[FieldAccessor[AnyRef, F]] - new ProductFieldAccessor[T,Object,F](pos, this, innerAccessor, config) - } - } - override def toString: String = { clazz.getName + "(" + fieldNames.zip(types).map { case (n, t) => n + ": " + t http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala index a9abea1..479483f 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala @@ -21,11 +21,9 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.java.typeutils.FieldAccessorTest import org.apache.flink.util.TestLogger import org.junit.Test import org.scalatest.junit.JUnitSuiteLike -import org.apache.flink.api.scala._ class CaseClassTypeInfoTest extends TestLogger with JUnitSuiteLike { @@ -72,112 +70,4 @@ class CaseClassTypeInfoTest extends TestLogger with JUnitSuiteLike { assert(!tpeInfo1.equals(tpeInfo2)) } - @Test - def testFieldAccessorFlatCaseClass(): Unit = { - case class IntBoolean(foo: Int, bar: Boolean) - val tpeInfo = createTypeInformation[IntBoolean] - - { - // by field name - val accessor1 = tpeInfo.getFieldAccessor[Int]("foo", null) - val accessor2 = tpeInfo.getFieldAccessor[Boolean]("bar", null) - - val x1 = IntBoolean(5, false) - assert(accessor1.get(x1) == 5) - assert(accessor2.get(x1) == false) - assert(x1.foo == 5) - assert(x1.bar == false) - - val x2: IntBoolean = accessor1.set(x1, 6) - assert(accessor1.get(x2) == 6) - assert(x2.foo == 6) - - val x3 = accessor2.set(x2, true) - assert(x3.bar == true) - assert(accessor2.get(x3) == true) - assert(x3.foo == 6) - } - - { - // by field pos - val accessor1 = tpeInfo.getFieldAccessor[Int](0, null) - val accessor2 = tpeInfo.getFieldAccessor[Boolean](1, null) - - val x1 = IntBoolean(5, false) - assert(accessor1.get(x1) == 5) - assert(accessor2.get(x1) == false) - assert(x1.foo == 5) - assert(x1.bar == false) - - val x2: IntBoolean = accessor1.set(x1, 6) - assert(accessor1.get(x2) == 6) - assert(x2.foo == 6) - - val x3 = accessor2.set(x2, true) - assert(x3.bar == true) - assert(accessor2.get(x3) == true) - assert(x3.foo == 6) - } - } - - @Test - def testFieldAccessorTuple(): Unit = { - val tpeInfo = createTypeInformation[(Int, Long)] - var x = (5, 6L) - val f0 = tpeInfo.getFieldAccessor[Int](0, null) - assert(f0.get(x) == 5) - x = f0.set(x, 8) - assert(f0.get(x) == 8) - assert(x._1 == 8) - } - - @Test - def testFieldAccessorCaseClassInCaseClass(): Unit = { - case class Inner(a: Short, b: String) - case class Outer(a: Int, i: Inner, b: Boolean) - val tpeInfo = createTypeInformation[Outer] - - var x = Outer(1, Inner(2, "alma"), true) - - val fib = tpeInfo.getFieldAccessor[String]("i.b", null) - assert(fib.get(x) == "alma") - assert(x.i.b == "alma") - x = fib.set(x, "korte") - assert(fib.get(x) == "korte") - assert(x.i.b == "korte") - - val fi = tpeInfo.getFieldAccessor[Inner]("i", null) - assert(fi.get(x) == Inner(2, "korte")) - x = fi.set(x, Inner(3, "aaa")) - assert(x.i == Inner(3, "aaa")) - } - - @Test - def testFieldAccessorPojoInCaseClass(): Unit = { - case class Outer(a: Int, i: FieldAccessorTest.Inner, b: Boolean) - var x = Outer(1, new FieldAccessorTest.Inner(3L, true), false) - val tpeInfo = createTypeInformation[Outer] - val cfg = new ExecutionConfig - - val fib = tpeInfo.getFieldAccessor[Boolean]("i.b", cfg) - assert(fib.get(x) == true) - assert(x.i.b == true) - x = fib.set(x, false) - assert(fib.get(x) == false) - assert(x.i.b == false) - - val fi = tpeInfo.getFieldAccessor[FieldAccessorTest.Inner]("i", cfg) - assert(fi.get(x).x == 3L) - assert(x.i.x == 3L) - x = fi.set(x, new FieldAccessorTest.Inner(4L, true)) - assert(fi.get(x).x == 4L) - assert(x.i.x == 4L) - - val fin = tpeInfo.getFieldAccessor[FieldAccessorTest.Inner](1, cfg) - assert(fin.get(x).x == 4L) - assert(x.i.x == 4L) - x = fin.set(x, new FieldAccessorTest.Inner(5L, true)) - assert(fin.get(x).x == 5L) - assert(x.i.x == 5L) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 264d5d0..5b00bcd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -384,9 +384,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * name of the (public) field on which to perform the aggregation. * Additionally, a dot can be used to drill down into nested * objects, as in {@code "field1.fieldxy" }. - * Furthermore, an array index can also be specified in case of an array of - * a primitive or basic type; or "0" or "*" can be specified in case of a - * basic type (which is considered as having only one field). + * Furthermore "*" can be specified in case of a basic type + * (which is considered as having only one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> sum(String field) { @@ -400,8 +399,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * * @param positionToMin * The field position in the data points to minimize. This is applicable to - * Tuple types, basic and primitive array types, Scala case classes, - * and primitive types (which is considered as having one field). + * Tuple types, Scala case classes, and primitive types (which is considered + * as having one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> min(int positionToMin) { @@ -422,9 +421,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * name of the (public) field on which to perform the aggregation. * Additionally, a dot can be used to drill down into nested * objects, as in {@code "field1.fieldxy" }. - * Furthermore, an array index can also be specified in case of an array of - * a primitive or basic type; or "0" or "*" can be specified in case of a - * basic type (which is considered as having only one field). + * Furthermore "*" can be specified in case of a basic type + * (which is considered as having only one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> min(String field) { @@ -438,9 +436,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * per key. * * @param positionToMax - * The field position in the data points to maximize. This is applicable to - * Tuple types, basic and primitive array types, Scala case classes, - * and primitive types (which is considered as having one field). + * The field position in the data points to minimize. This is applicable to + * Tuple types, Scala case classes, and primitive types (which is considered + * as having one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> max(int positionToMax) { @@ -461,9 +459,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * name of the (public) field on which to perform the aggregation. * Additionally, a dot can be used to drill down into nested * objects, as in {@code "field1.fieldxy" }. - * Furthermore, an array index can also be specified in case of an array of - * a primitive or basic type; or "0" or "*" can be specified in case of a - * basic type (which is considered as having only one field). + * Furthermore "*" can be specified in case of a basic type + * (which is considered as having only one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> max(String field) { @@ -484,9 +481,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * name of the (public) field on which to perform the aggregation. * Additionally, a dot can be used to drill down into nested * objects, as in {@code "field1.fieldxy" }. - * Furthermore, an array index can also be specified in case of an array of - * a primitive or basic type; or "0" or "*" can be specified in case of a - * basic type (which is considered as having only one field). + * Furthermore "*" can be specified in case of a basic type + * (which is considered as having only one field). * @param first * If True then in case of field equality the first object will * be returned @@ -511,9 +507,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * name of the (public) field on which to perform the aggregation. * Additionally, a dot can be used to drill down into nested * objects, as in {@code "field1.fieldxy" }. - * Furthermore, an array index can also be specified in case of an array of - * a primitive or basic type; or "0" or "*" can be specified in case of a - * basic type (which is considered as having only one field). + * Furthermore "*" can be specified in case of a basic type + * (which is considered as having only one field). * @param first * If True then in case of field equality the first object will * be returned @@ -532,8 +527,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * * @param positionToMinBy * The field position in the data points to minimize. This is applicable to - * Tuple types, basic and primitive array types, Scala case classes, - * and primitive types (which is considered as having one field). + * Tuple types, Scala case classes, and primitive types (which is considered + * as having one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> minBy(int positionToMinBy) { @@ -551,9 +546,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * name of the (public) field on which to perform the aggregation. * Additionally, a dot can be used to drill down into nested * objects, as in {@code "field1.fieldxy" }. - * Furthermore, an array index can also be specified in case of an array of - * a primitive or basic type; or "0" or "*" can be specified in case of a - * basic type (which is considered as having only one field). + * Furthermore "*" can be specified in case of a basic type + * (which is considered as having only one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> minBy(String positionToMinBy) { @@ -569,8 +563,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * * @param positionToMinBy * The field position in the data points to minimize. This is applicable to - * Tuple types, basic and primitive array types, Scala case classes, - * and primitive types (which is considered as having one field). + * Tuple types, Scala case classes, and primitive types (which is considered + * as having one field). * @param first * If true, then the operator return the first element with the * minimal value, otherwise returns the last @@ -588,9 +582,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * given position, the operator returns the first one by default. * * @param positionToMaxBy - * The field position in the data points to maximize. This is applicable to - * Tuple types, basic and primitive array types, Scala case classes, - * and primitive types (which is considered as having one field). + * The field position in the data points to minimize. This is applicable to + * Tuple types, Scala case classes, and primitive types (which is considered + * as having one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) { @@ -608,9 +602,8 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * name of the (public) field on which to perform the aggregation. * Additionally, a dot can be used to drill down into nested * objects, as in {@code "field1.fieldxy" }. - * Furthermore, an array index can also be specified in case of an array of - * a primitive or basic type; or "0" or "*" can be specified in case of a - * basic type (which is considered as having only one field). + * Furthermore "*" can be specified in case of a basic type + * (which is considered as having only one field). * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) { @@ -625,9 +618,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> { * depending on the parameter set. * * @param positionToMaxBy - * The field position in the data points to maximize. This is applicable to - * Tuple types, basic and primitive array types, Scala case classes, - * and primitive types (which is considered as having one field). + * The field position in the data points to minimize. This is applicable to + * Tuple types, Scala case classes, and primitive types (which is considered + * as having one field). * @param first * If true, then the operator return the first element with the * maximum value, otherwise returns the last http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java index 465548e..c634434 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.java @@ -20,7 +20,8 @@ package org.apache.flink.streaming.api.functions.aggregation; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.FieldAccessor; +import org.apache.flink.streaming.util.typeutils.FieldAccessor; +import org.apache.flink.streaming.util.typeutils.FieldAccessorFactory; @Internal public class ComparableAggregator<T> extends AggregationFunction<T> { @@ -51,7 +52,7 @@ public class ComparableAggregator<T> extends AggregationFunction<T> { AggregationType aggregationType, boolean first, ExecutionConfig config) { - this(aggregationType, typeInfo.getFieldAccessor(positionToAggregate, config), first); + this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first); } public ComparableAggregator(String field, @@ -59,7 +60,7 @@ public class ComparableAggregator<T> extends AggregationFunction<T> { AggregationType aggregationType, boolean first, ExecutionConfig config) { - this(aggregationType, typeInfo.getFieldAccessor(field,config), first); + this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first); } http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java index 90d5e74..5e1378e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/aggregation/SumAggregator.java @@ -22,8 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.util.typeutils.FieldAccessorFactory; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.FieldAccessor; +import org.apache.flink.streaming.util.typeutils.FieldAccessor; @Internal public class SumAggregator<T> extends AggregationFunction<T> { @@ -36,7 +37,7 @@ public class SumAggregator<T> extends AggregationFunction<T> { private final boolean isTuple; public SumAggregator(int pos, TypeInformation<T> typeInfo, ExecutionConfig config) { - fieldAccessor = typeInfo.getFieldAccessor(pos, config); + fieldAccessor = FieldAccessorFactory.getAccessor(typeInfo, pos, config); adder = SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass()); if (typeInfo instanceof TupleTypeInfo) { isTuple = true; @@ -48,7 +49,7 @@ public class SumAggregator<T> extends AggregationFunction<T> { } public SumAggregator(String field, TypeInformation<T> typeInfo, ExecutionConfig config) { - fieldAccessor = typeInfo.getFieldAccessor(field, config); + fieldAccessor = FieldAccessorFactory.getAccessor(typeInfo, field, config); adder = SumFunction.getForClass(fieldAccessor.getFieldType().getTypeClass()); if (typeInfo instanceof TupleTypeInfo) { isTuple = true; http://git-wip-us.apache.org/repos/asf/flink/blob/870e219d/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java new file mode 100644 index 0000000..2828308 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, RecursiveProductFieldAccessor) + */ +@Internal +public abstract class FieldAccessor<T, F> implements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** + * Gets the TypeInformation for the type of the field. + * Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). + */ + @SuppressWarnings("unchecked") + public TypeInformation<F> getFieldType() { + return fieldType; + } + + + /** + * Gets the value of the field (specified in the constructor) of the given record. + * @param record The record on which the field will be accessed + * @return The value of the field. + */ + public abstract F get(T record); + + /** + * Sets the field (specified in the constructor) of the given record to the given value. + * + * Warning: This might modify the original object, or might return a new object instance. + * (This is necessary, because the record might be immutable.) + * + * @param record The record to modify + * @param fieldValue The new value of the field + * @return A record that has the given field value. (this might be a new instance or the original) + */ + public abstract T set(T record, F fieldValue); + + + // -------------------------------------------------------------------------------------------------- + + + /** + * This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a + * field of a POJO that is itself some composite type but is not further decomposed) + */ + final static class SimpleFieldAccessor<T> extends FieldAccessor<T, T> { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation<T> typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { + return fieldValue; + } + } + + final static class ArrayFieldAccessor<T, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private final int pos; + + public ArrayFieldAccessor(int pos, TypeInformation typeInfo) { + if(pos < 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on" + + " an array, which is an invalid index."); + } + checkNotNull(typeInfo, "typeInfo must not be null."); + + this.pos = pos; + this.fieldType = BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType()); + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + return (F) Array.get(record, pos); + } + + @Override + public T set(T record, F fieldValue) { + Array.set(record, pos, fieldValue); + return record; + } + } + + /** + * There are two versions of TupleFieldAccessor, differing in whether there is an other + * FieldAccessor nested inside. The no inner accessor version is probably a little faster. + */ + static final class SimpleTupleFieldAccessor<T extends Tuple, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private final int pos; + + SimpleTupleFieldAccessor(int pos, TypeInformation<T> typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + int arity = ((TupleTypeInfo)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new CompositeType.InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + + this.pos = pos; + this.fieldType = ((TupleTypeInfo)typeInfo).getTypeAt(pos); + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + return (F) record.getField(pos); + } + + @Override + public T set(T record, F fieldValue) { + record.setField(fieldValue, pos); + return record; + } + } + + /** + * @param <T> The Tuple type + * @param <R> The field type at the first level + * @param <F> The field type at the innermost level + */ + static final class RecursiveTupleFieldAccessor<T extends Tuple, R, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final FieldAccessor<R, F> innerAccessor; + + RecursiveTupleFieldAccessor(int pos, FieldAccessor<R, F> innerAccessor, TypeInformation<T> typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + int arity = ((TupleTypeInfo)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new CompositeType.InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + + if(pos < 0) { + throw new CompositeType.InvalidFieldReferenceException("Tried to select " + ((Integer) pos).toString() + ". field."); + } + + this.pos = pos; + this.innerAccessor = innerAccessor; + this.fieldType = innerAccessor.fieldType; + } + + @Override + public F get(T record) { + final R inner = record.getField(pos); + return innerAccessor.get(inner); + } + + @Override + public T set(T record, F fieldValue) { + final R inner = record.getField(pos); + record.setField(innerAccessor.set(inner, fieldValue), pos); + return record; + } + } + + /** + * @param <T> The POJO type + * @param <R> The field type at the first level + * @param <F> The field type at the innermost level + */ + static final class PojoFieldAccessor<T, R, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private transient Field field; + private final FieldAccessor<R, F> innerAccessor; + + PojoFieldAccessor(Field field, FieldAccessor<R, F> innerAccessor) { + checkNotNull(field, "field must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + this.field = field; + this.innerAccessor = innerAccessor; + this.fieldType = innerAccessor.fieldType; + } + + @Override + public F get(T pojo) { + try { + @SuppressWarnings("unchecked") + final R inner = (R)field.get(pojo); + return innerAccessor.get(inner); + } catch (IllegalAccessException iaex) { + // The Field class is transient and when deserializing its value we also make it accessible + throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject." + + " fields: " + field + " obj: " + pojo); + } + } + + @Override + public T set(T pojo, F valueToSet) { + try { + @SuppressWarnings("unchecked") + final R inner = (R)field.get(pojo); + field.set(pojo, innerAccessor.set(inner, valueToSet)); + return pojo; + } catch (IllegalAccessException iaex) { + // The Field class is transient and when deserializing its value we also make it accessible + throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject." + + " fields: " + field + " obj: " + pojo); + } + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + FieldSerializer.serializeField(field, out); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + field = FieldSerializer.deserializeField(in); + } + } + + /** + * There are two versions of ProductFieldAccessor, differing in whether there is an other + * FieldAccessor nested inside. The no inner accessor version is probably a little faster. + */ + static final class SimpleProductFieldAccessor<T, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final TupleSerializerBase<T> serializer; + private final Object[] fields; + private final int length; + + SimpleProductFieldAccessor(int pos, TypeInformation<T> typeInfo, ExecutionConfig config) { + checkNotNull(typeInfo, "typeInfo must not be null."); + int arity = ((TupleTypeInfoBase)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new CompositeType.InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + + this.pos = pos; + this.fieldType = ((TupleTypeInfoBase<T>)typeInfo).getTypeAt(pos); + this.serializer = (TupleSerializerBase<T>)typeInfo.createSerializer(config); + this.length = this.serializer.getArity(); + this.fields = new Object[this.length]; + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + Product prod = (Product)record; + return (F) prod.productElement(pos); + } + + @Override + public T set(T record, F fieldValue) { + Product prod = (Product)record; + for (int i = 0; i < length; i++) { + fields[i] = prod.productElement(i); + } + fields[pos] = fieldValue; + return serializer.createInstance(fields); + } + } + + + static final class RecursiveProductFieldAccessor<T, R, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final TupleSerializerBase<T> serializer; + private final Object[] fields; + private final int length; + private final FieldAccessor<R, F> innerAccessor; + + RecursiveProductFieldAccessor(int pos, TypeInformation<T> typeInfo, FieldAccessor<R, F> innerAccessor, ExecutionConfig config) { + int arity = ((TupleTypeInfoBase)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new CompositeType.InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + checkNotNull(typeInfo, "typeInfo must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + this.pos = pos; + this.fieldType = ((TupleTypeInfoBase<T>)typeInfo).getTypeAt(pos); + this.serializer = (TupleSerializerBase<T>)typeInfo.createSerializer(config); + this.length = this.serializer.getArity(); + this.fields = new Object[this.length]; + this.innerAccessor = innerAccessor; + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + return innerAccessor.get((R)((Product)record).productElement(pos)); + } + + @SuppressWarnings("unchecked") + @Override + public T set(T record, F fieldValue) { + Product prod = (Product) record; + for (int i = 0; i < length; i++) { + fields[i] = prod.productElement(i); + } + fields[pos] = innerAccessor.set((R)fields[pos], fieldValue); + return serializer.createInstance(fields); + } + } +}
