Github user mbalassi commented on a diff in the pull request:
https://github.com/apache/flink/pull/2094#discussion_r88616007
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessorFactory.java
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+
+import java.io.Serializable;
+
+
+/**
+ * Static factories for the {@link FieldAccessor} utilities.
+ */
+@Internal
+public class FieldAccessorFactory implements Serializable {
+
+ /**
+ * Creates a {@link FieldAccessor} for the given field position, which
can be used to get and set
+ * the specified field on instances of this type.
+ *
+ * @param pos The field position (zero-based)
+ * @param config Configuration object
+ * @param <F> The type of the field to access
+ * @return The created FieldAccessor
+ */
+ @Internal
+ public static <T, F> FieldAccessor<T, F> getAccessor(TypeInformation<T>
typeInfo, int pos, ExecutionConfig config){
+
+ // In case of arrays
+ if (typeInfo instanceof BasicArrayTypeInfo || typeInfo
instanceof PrimitiveArrayTypeInfo) {
+ return new FieldAccessor.ArrayFieldAccessor<>(pos,
typeInfo);
+
+ // In case of basic types
+ } else if (typeInfo instanceof BasicTypeInfo) {
+ if (pos != 0) {
+ throw new
CompositeType.InvalidFieldReferenceException("The " + ((Integer)
pos).toString() + ". field selected on a " +
+ "basic type (" + typeInfo.toString() +
"). A field expression on a basic type can only select " +
+ "the 0th field (which means selecting
the entire basic type).");
+ }
+ return (FieldAccessor<T, F>) new
FieldAccessor.SimpleFieldAccessor<>(typeInfo);
+
+ // In case of case classes
+ } else if (typeInfo.isTupleType() && ((TupleTypeInfoBase)
typeInfo).isCaseClass()) {
+ TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase)
typeInfo;
+ TypeInformation<?>[] types = new
TypeInformation<?>[tupleTypeInfo.getArity()];
+ for (int i = 0; i < types.length; i++) {
+ types[i] = tupleTypeInfo.getTypeAt(i);
+ }
+ return new FieldAccessor.ProductFieldAccessor<>(
+ pos, typeInfo, new
FieldAccessor.SimpleFieldAccessor<>((TypeInformation<F>)types[pos]), config);
+
+ // In case of tuples
+ } else if (typeInfo.isTupleType()) {
+ return new
FieldAccessor.SimpleTupleFieldAccessor<>(pos, typeInfo);
+
+ // Default case, PojoType is directed to this statement
+ } else {
+ throw new
CompositeType.InvalidFieldReferenceException("Cannot reference field by
position on " + typeInfo.toString()
+ + "Referencing a field by position is supported
on tuples, case classes, and arrays. "
+ + "Additionally, you can select the 0th field
of a primitive/basic type (e.g. int).");
+ }
+ }
+
+ /**
+ * Creates a {@link FieldAccessor} for the field that is given by a
field expression,
+ * which can be used to get and set the specified field on instances of
this type.
+ *
+ * @param field The field expression
+ * @param config Configuration object
+ * @param <F> The type of the field to access
+ * @return The created FieldAccessor
+ */
+ @Internal
+ public static <T, F> FieldAccessor<T, F> getAccessor(TypeInformation<T>
typeInfo, String field, ExecutionConfig config) {
+
+ // In case of arrays
+ if (typeInfo instanceof BasicArrayTypeInfo || typeInfo
instanceof PrimitiveArrayTypeInfo) {
+ try {
+ return new
FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(field), typeInfo);
+ } catch (NumberFormatException ex) {
+ throw new
CompositeType.InvalidFieldReferenceException
+ ("A field expression on an array must
be an integer index (that might be given as a string).");
+ }
+
+ // In case of basic types
+ } else if (typeInfo instanceof BasicTypeInfo) {
+ try {
+ int pos = field.equals("*") ? 0 :
Integer.parseInt(field);
--- End diff --
Thanks.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---