[
https://issues.apache.org/jira/browse/FLINK-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676124#comment-15676124
]
ASF GitHub Bot commented on FLINK-3702:
---------------------------------------
Github user mbalassi commented on a diff in the pull request:
https://github.com/apache/flink/pull/2094#discussion_r88614874
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
---
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by
the user as either an index
+ * or a field expression string. TypeInformation can also be requested for
the field.
+ * The position index might specify a field of a Tuple, an array, or a
simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result
in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an
"innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor,
ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor<T, F> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ protected TypeInformation fieldType;
+
+ /**
+ * Gets the TypeInformation for the type of the field.
+ * Note: For an array of a primitive type, it returns the corresponding
basic type (Integer for int[]).
+ */
+ @SuppressWarnings("unchecked")
+ public TypeInformation<F> getFieldType() {
+ return fieldType;
+ }
+
+
+ /**
+ * Gets the value of the field (specified in the constructor) of the
given record.
+ * @param record The record on which the field will be accessed
+ * @return The value of the field.
+ */
+ public abstract F get(T record);
+
+ /**
+ * Sets the field (specified in the constructor) of the given record to
the given value.
+ *
+ * Warning: This might modify the original object, or might return a
new object instance.
+ * (This is necessary, because the record might be immutable.)
+ *
+ * @param record The record to modify
+ * @param fieldValue The new value of the field
+ * @return A record that has the given field value. (this might be a
new instance or the original)
+ */
+ public abstract T set(T record, F fieldValue);
+
+
+ //
--------------------------------------------------------------------------------------------------
+
+
+ /**
+ * This is when the entire record is considered as a single field. (eg.
field 0 of a basic type, or a
+ * field of a POJO that is itself some composite type but is not
further decomposed)
+ */
+ final static class SimpleFieldAccessor<T> extends FieldAccessor<T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ public SimpleFieldAccessor(TypeInformation<T> typeInfo) {
+ checkNotNull(typeInfo, "typeInfo must not be null.");
+
+ this.fieldType = typeInfo;
+ }
+
+ @Override
+ public T get(T record) {
+ return record;
+ }
+
+ @Override
+ public T set(T record, T fieldValue) {
+ return fieldValue;
+ }
+ }
+
+ final static class ArrayFieldAccessor<T, F> extends FieldAccessor<T, F>
{
+
+ private static final long serialVersionUID = 1L;
+
+ private final int pos;
+
+ public ArrayFieldAccessor(int pos, TypeInformation typeInfo) {
+ if(pos < 0) {
+ throw new
CompositeType.InvalidFieldReferenceException("The " + ((Integer)
pos).toString() + ". field selected on" +
+ " an array, which is an invalid
index.");
+ }
+ checkNotNull(typeInfo, "typeInfo must not be null.");
+
+ this.pos = pos;
+ this.fieldType =
BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public F get(T record) {
+ return (F) Array.get(record, pos);
+ }
+
+ @Override
+ public T set(T record, F fieldValue) {
+ Array.set(record, pos, fieldValue);
+ return record;
+ }
+ }
+
+ /**
+ * There are two versions of TupleFieldAccessor, differing in whether
there is an other
+ * FieldAccessor nested inside. The no inner accessor version is
probably a little faster.
+ */
+ static final class SimpleTupleFieldAccessor<T, F> extends
FieldAccessor<T, F> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int pos;
+
+ SimpleTupleFieldAccessor(int pos, TypeInformation<T> typeInfo) {
+ checkNotNull(typeInfo, "typeInfo must not be null.");
+ int arity = ((TupleTypeInfo)typeInfo).getArity();
+ if(pos < 0 || pos >= arity) {
+ throw new
CompositeType.InvalidFieldReferenceException(
+ "Tried to select " + ((Integer)
pos).toString() + ". field on \"" +
+ typeInfo.toString() + "\", which is an
invalid index.");
+ }
+
+ this.pos = pos;
+ this.fieldType =
((TupleTypeInfo)typeInfo).getTypeAt(pos);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public F get(T record) {
+ final Tuple tuple = (Tuple) record;
+ return (F) tuple.getField(pos);
+ }
+
+ @Override
+ public T set(T record, F fieldValue) {
+ final Tuple tuple = (Tuple) record;
+ tuple.setField(fieldValue, pos);
+ return record;
+ }
+ }
+
+ /**
+ * @param <T> The Tuple type
+ * @param <R> The field type at the first level
+ * @param <F> The field type at the innermost level
+ */
+ static final class RecursiveTupleFieldAccessor<T, R, F> extends
FieldAccessor<T, F> {
--- End diff --
Yep.
> DataStream API PojoFieldAccessor doesn't support nested POJOs
> -------------------------------------------------------------
>
> Key: FLINK-3702
> URL: https://issues.apache.org/jira/browse/FLINK-3702
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
> Affects Versions: 1.0.0
> Reporter: Robert Metzger
> Assignee: Gabor Gevay
>
> The {{PojoFieldAccessor}} (which is used by {{.sum(String)}} and similar
> methods) doesn't support nested POJOs right now.
> As part of FLINK-3697 I'll add a check for a nested POJO and fail with an
> exception.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)