Github user mbalassi commented on a diff in the pull request: https://github.com/apache/flink/pull/2094#discussion_r88615630 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util.typeutils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.operators.Keys; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.FieldSerializer; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import scala.Product; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * These classes encapsulate the logic of accessing a field specified by the user as either an index + * or a field expression string. TypeInformation can also be requested for the field. + * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field"). + * + * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors. + * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor". + * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor) + */ +@PublicEvolving +public abstract class FieldAccessor<T, F> implements Serializable { + + private static final long serialVersionUID = 1L; + + protected TypeInformation fieldType; + + /** + * Gets the TypeInformation for the type of the field. + * Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]). + */ + @SuppressWarnings("unchecked") + public TypeInformation<F> getFieldType() { + return fieldType; + } + + + /** + * Gets the value of the field (specified in the constructor) of the given record. + * @param record The record on which the field will be accessed + * @return The value of the field. + */ + public abstract F get(T record); + + /** + * Sets the field (specified in the constructor) of the given record to the given value. + * + * Warning: This might modify the original object, or might return a new object instance. + * (This is necessary, because the record might be immutable.) + * + * @param record The record to modify + * @param fieldValue The new value of the field + * @return A record that has the given field value. (this might be a new instance or the original) + */ + public abstract T set(T record, F fieldValue); + + + // -------------------------------------------------------------------------------------------------- + + + /** + * This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a + * field of a POJO that is itself some composite type but is not further decomposed) + */ + final static class SimpleFieldAccessor<T> extends FieldAccessor<T, T> { + + private static final long serialVersionUID = 1L; + + public SimpleFieldAccessor(TypeInformation<T> typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + + this.fieldType = typeInfo; + } + + @Override + public T get(T record) { + return record; + } + + @Override + public T set(T record, T fieldValue) { + return fieldValue; + } + } + + final static class ArrayFieldAccessor<T, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private final int pos; + + public ArrayFieldAccessor(int pos, TypeInformation typeInfo) { + if(pos < 0) { + throw new CompositeType.InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on" + + " an array, which is an invalid index."); + } + checkNotNull(typeInfo, "typeInfo must not be null."); + + this.pos = pos; + this.fieldType = BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType()); + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + return (F) Array.get(record, pos); + } + + @Override + public T set(T record, F fieldValue) { + Array.set(record, pos, fieldValue); + return record; + } + } + + /** + * There are two versions of TupleFieldAccessor, differing in whether there is an other + * FieldAccessor nested inside. The no inner accessor version is probably a little faster. + */ + static final class SimpleTupleFieldAccessor<T, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private final int pos; + + SimpleTupleFieldAccessor(int pos, TypeInformation<T> typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + int arity = ((TupleTypeInfo)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new CompositeType.InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + + this.pos = pos; + this.fieldType = ((TupleTypeInfo)typeInfo).getTypeAt(pos); + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + final Tuple tuple = (Tuple) record; + return (F) tuple.getField(pos); + } + + @Override + public T set(T record, F fieldValue) { + final Tuple tuple = (Tuple) record; + tuple.setField(fieldValue, pos); + return record; + } + } + + /** + * @param <T> The Tuple type + * @param <R> The field type at the first level + * @param <F> The field type at the innermost level + */ + static final class RecursiveTupleFieldAccessor<T, R, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final FieldAccessor<R, F> innerAccessor; + + RecursiveTupleFieldAccessor(int pos, FieldAccessor<R, F> innerAccessor, TypeInformation<T> typeInfo) { + checkNotNull(typeInfo, "typeInfo must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + int arity = ((TupleTypeInfo)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new CompositeType.InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + + if(pos < 0) { + throw new CompositeType.InvalidFieldReferenceException("Tried to select " + ((Integer) pos).toString() + ". field."); + } + + this.pos = pos; + this.innerAccessor = innerAccessor; + this.fieldType = innerAccessor.fieldType; + } + + @Override + public F get(T record) { + final Tuple tuple = (Tuple) record; + final R inner = tuple.getField(pos); + return innerAccessor.get(inner); + } + + @Override + public T set(T record, F fieldValue) { + final Tuple tuple = (Tuple) record; + final R inner = tuple.getField(pos); + tuple.setField(innerAccessor.set(inner, fieldValue), pos); + return record; + } + } + + /** + * @param <T> The POJO type + * @param <R> The field type at the first level + * @param <F> The field type at the innermost level + */ + static final class PojoFieldAccessor<T, R, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private transient Field field; + private final FieldAccessor<R, F> innerAccessor; + + PojoFieldAccessor(Field field, FieldAccessor<R, F> innerAccessor) { + checkNotNull(field, "field must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + this.field = field; + this.innerAccessor = innerAccessor; + this.fieldType = innerAccessor.fieldType; + } + + @Override + public F get(T pojo) { + try { + @SuppressWarnings("unchecked") + final R inner = (R)field.get(pojo); + return innerAccessor.get(inner); + } catch (IllegalAccessException iaex) { + // The Field class is transient and when deserializing its value we also make it accessible + throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject." + + " fields: " + field + " obj: " + pojo); + } + } + + @Override + public T set(T pojo, F valueToSet) { + try { + @SuppressWarnings("unchecked") + final R inner = (R)field.get(pojo); + field.set(pojo, innerAccessor.set(inner, valueToSet)); + return pojo; + } catch (IllegalAccessException iaex) { + // The Field class is transient and when deserializing its value we also make it accessible + throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject." + + " fields: " + field + " obj: " + pojo); + } + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + FieldSerializer.serializeField(field, out); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + field = FieldSerializer.deserializeField(in); + } + } + + static final class ProductFieldAccessor<T, R, F> extends FieldAccessor<T, F> { + + private static final long serialVersionUID = 1L; + + private final int pos; + private final TupleSerializerBase<T> serializer; + private final Object[] fields; + private final int length; + private final FieldAccessor<R, F> innerAccessor; + + ProductFieldAccessor(int pos, TypeInformation<T> typeInfo, FieldAccessor<R, F> innerAccessor, ExecutionConfig config) { + int arity = ((TupleTypeInfoBase)typeInfo).getArity(); + if(pos < 0 || pos >= arity) { + throw new CompositeType.InvalidFieldReferenceException( + "Tried to select " + ((Integer) pos).toString() + ". field on \"" + + typeInfo.toString() + "\", which is an invalid index."); + } + checkNotNull(typeInfo, "typeInfo must not be null."); + checkNotNull(innerAccessor, "innerAccessor must not be null."); + + this.pos = pos; + this.fieldType = ((TupleTypeInfoBase<T>)typeInfo).getTypeAt(pos); + this.serializer = (TupleSerializerBase<T>)typeInfo.createSerializer(config); + this.length = this.serializer.getArity(); + this.fields = new Object[this.length]; + this.innerAccessor = innerAccessor; + } + + @SuppressWarnings("unchecked") + @Override + public F get(T record) { + return innerAccessor.get((R)((Product)record).productElement(pos)); + } + + @SuppressWarnings("unchecked") + @Override + public T set(T record, F fieldValue) { + Product prod = (Product)record; + for (int i = 0; i < length; i++) { + fields[i] = prod.productElement(i); + } + fields[pos] = innerAccessor.set((R)fields[pos], fieldValue); + return serializer.createInstance(fields); + } + } + + + // -------------------------------------------------------------------------------------------------- --- End diff -- Good idea.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---