[
https://issues.apache.org/jira/browse/BEAM-4453?focusedWorklogId=171420&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-171420
]
ASF GitHub Bot logged work on BEAM-4453:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Dec/18 00:03
Start Date: 03/Dec/18 00:03
Worklog Time Spent: 10m
Work Description: reuvenlax closed pull request #7147: [BEAM-4453] Use
constructors to generate schema POJOs
URL: https://github.com/apache/beam/pull/7147
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java
new file mode 100644
index 000000000000..ee6713e358cc
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/CachingFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Factory} that assumes the schema parameter never
changes.
+ *
+ * <p>{@link Factory} objects take the schema as a parameter, as often the
returned type varies by
+ * schema (e.g. sometimes the returned-type is a list that must be in
schema-field order). However
+ * in many cases it's known by the caller that the schema parameter is always
the same across all
+ * calls to create. In these cases we want to save the cost of Schema
comparison (which can be
+ * significant for larger schemas) on each lookup. This wrapper caches the
value returned by the
+ * inner factory, so the schema comparison only need happen on the first
lookup.
+ */
+class CachingFactory<CreatedT> implements Factory<CreatedT> {
+ @Nullable private transient ConcurrentHashMap<Class, CreatedT> cache = null;
+
+ private final Factory<CreatedT> innerFactory;
+
+ public CachingFactory(Factory<CreatedT> innerFactory) {
+ this.innerFactory = innerFactory;
+ }
+
+ @Override
+ public CreatedT create(Class<?> clazz, Schema schema) {
+ if (cache == null) {
+ cache = new ConcurrentHashMap<>();
+ }
+ CreatedT cached = cache.get(clazz);
+ if (cached != null) {
+ return cached;
+ }
+ cached = innerFactory.create(clazz, schema);
+ cache.put(clazz, cached);
+ return cached;
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Factory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Factory.java
new file mode 100644
index 000000000000..f3514a01eefd
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Factory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.io.Serializable;
+
+/** A Factory interface for schema-related objects for a specific Java type. */
+public interface Factory<T> extends Serializable {
+ T create(Class<?> clazz, Schema schema);
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueGetter.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueGetter.java
index 0be68e53a893..0fd541fc5537 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueGetter.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueGetter.java
@@ -34,6 +34,4 @@
ValueT get(ObjectT object);
String name();
-
- Class type();
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueGetterFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueGetterFactory.java
index b7bf55598eeb..c4ce63150909 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueGetterFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueGetterFactory.java
@@ -17,18 +17,18 @@
*/
package org.apache.beam.sdk.schemas;
-import java.io.Serializable;
import java.util.List;
/**
* A factory interface for creating {@link
org.apache.beam.sdk.schemas.FieldValueGetter} objects
* corresponding to a class.
*/
-public interface FieldValueGetterFactory extends Serializable {
+public interface FieldValueGetterFactory extends
Factory<List<FieldValueGetter>> {
/**
* Returns a list of {@link org.apache.beam.sdk.schemas.FieldValueGetter}s
for the target class.
*
* <p>The returned list is ordered by the order of matching fields in the
schema.
*/
- List<FieldValueGetter> createGetters(Class<?> targetClass, Schema schema);
+ @Override
+ List<FieldValueGetter> create(Class<?> targetClass, Schema schema);
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java
index 2a45892f68c6..5d9e82bf24a6 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetter.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.schemas;
import java.io.Serializable;
-import java.lang.reflect.Type;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
@@ -37,19 +36,4 @@
/** Returns the name of the field. */
String name();
-
- /** Returns the field type. */
- Class type();
-
- /** If the field is a container type, returns the element type. */
- @Nullable
- Type elementType();
-
- /** If the field is a map type, returns the key type. */
- @Nullable
- Type mapKeyType();
-
- /** If the field is a map type, returns the key type. */
- @Nullable
- Type mapValueType();
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetterFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetterFactory.java
index ea2107956547..893ba9cbbe31 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetterFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueSetterFactory.java
@@ -17,18 +17,18 @@
*/
package org.apache.beam.sdk.schemas;
-import java.io.Serializable;
import java.util.List;
/**
* A factory interface for creating {@link
org.apache.beam.sdk.schemas.FieldValueSetter} objects
* corresponding to a class.
*/
-public interface FieldValueSetterFactory extends Serializable {
+public interface FieldValueSetterFactory extends
Factory<List<FieldValueSetter>> {
/**
* Returns a list of {@link org.apache.beam.sdk.schemas.FieldValueGetter}s
for the target class.
*
* <p>The returned list is ordered by the order of matching fields in the
schema.
*/
- List<FieldValueSetter> createSetters(Class<?> targetClass, Schema schema);
+ @Override
+ List<FieldValueSetter> create(Class<?> targetClass, Schema schema);
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
new file mode 100644
index 000000000000..5cccdf67ef3d
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Collection;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Represents type information for a schema field. */
+@AutoValue
+public abstract class FieldValueTypeInformation implements Serializable {
+ /** Returns the field name. */
+ public abstract String getName();
+
+ /** Returns the field type. */
+ public abstract Class getType();
+
+ /** If the field is a container type, returns the element type. */
+ @Nullable
+ public abstract Type getElementType();
+
+ /** If the field is a map type, returns the key type. */
+ @Nullable
+ public abstract Type getMapKeyType();
+
+ /** If the field is a map type, returns the key type. */
+ @Nullable
+ public abstract Type getMapValueType();
+
+ public static FieldValueTypeInformation of(Field field) {
+ return new AutoValue_FieldValueTypeInformation(
+ field.getName(),
+ field.getType(),
+ getArrayComponentType(field),
+ getMapKeyType(field),
+ getMapValueType(field));
+ }
+
+ public static FieldValueTypeInformation of(TypeInformation typeInformation) {
+ return new AutoValue_FieldValueTypeInformation(
+ typeInformation.getName(),
+ typeInformation.getType().getRawType(),
+ getArrayComponentType(typeInformation),
+ getMapKeyType(typeInformation),
+ getMapValueType(typeInformation));
+ }
+
+ private static Type getArrayComponentType(TypeInformation typeInformation) {
+ return getArrayComponentType(typeInformation.getType());
+ }
+
+ private static Type getArrayComponentType(Field field) {
+ return getArrayComponentType(TypeDescriptor.of(field.getGenericType()));
+ }
+
+ @Nullable
+ private static Type getArrayComponentType(TypeDescriptor valueType) {
+ if (valueType.isArray()) {
+ Type component = valueType.getComponentType().getType();
+ if (!component.equals(byte.class)) {
+ return component;
+ }
+ } else if (valueType.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
+ TypeDescriptor<Collection<?>> collection =
valueType.getSupertype(Collection.class);
+ if (collection.getType() instanceof ParameterizedType) {
+ ParameterizedType ptype = (ParameterizedType) collection.getType();
+ java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
+ checkArgument(params.length == 1);
+ return params[0];
+ } else {
+ throw new RuntimeException("Collection parameter is not
parameterized!");
+ }
+ }
+ return null;
+ }
+
+ // If the Field is a map type, returns the key type, otherwise returns a
null reference.
+ @Nullable
+ private static Type getMapKeyType(Field field) {
+ return getMapType(TypeDescriptor.of(field.getGenericType()), 0);
+ }
+
+ @Nullable
+ private static Type getMapKeyType(TypeInformation typeInformation) {
+ return getMapType(typeInformation.getType(), 0);
+ }
+
+ // If the Field is a map type, returns the value type, otherwise returns a
null reference.
+ @Nullable
+ private static Type getMapValueType(Field field) {
+ return getMapType(TypeDescriptor.of(field.getGenericType()), 1);
+ }
+
+ @Nullable
+ private static Type getMapValueType(TypeInformation typeInformation) {
+ return getMapType(typeInformation.getType(), 1);
+ }
+
+ // If the Field is a map type, returns the key or value type (0 is key type,
1 is value).
+ // Otherwise returns a null reference.
+ @SuppressWarnings("unchecked")
+ @Nullable
+ private static Type getMapType(TypeDescriptor valueType, int index) {
+ if (valueType.isSubtypeOf(TypeDescriptor.of(Map.class))) {
+ TypeDescriptor<Collection<?>> map = valueType.getSupertype(Map.class);
+ if (map.getType() instanceof ParameterizedType) {
+ ParameterizedType ptype = (ParameterizedType) map.getType();
+ java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
+ return params[index];
+ } else {
+ throw new RuntimeException("Map type is not parameterized! " + map);
+ }
+ }
+ return null;
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformationFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformationFactory.java
new file mode 100644
index 000000000000..8f07a64a557e
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformationFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.util.List;
+
+/** Factory for vending type information for all fields of a class. */
+public interface FieldValueTypeInformationFactory extends
Factory<List<FieldValueTypeInformation>> {
+ @Override
+ List<FieldValueTypeInformation> create(Class<?> targetClass, Schema schema);
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
new file mode 100644
index 000000000000..2ec1a42e7d07
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.RowWithGetters;
+
+/** Function to convert a {@link Row} to a user type using a creator factory.
*/
+class FromRowUsingCreator<T> implements SerializableFunction<Row, T> {
+ private final Class<T> clazz;
+ private final Factory<SchemaUserTypeCreator> schemaTypeCreatorFactory;
+ private final Factory<List<FieldValueTypeInformation>>
fieldValueTypeInformationFactory;
+
+ public FromRowUsingCreator(
+ Class<T> clazz,
+ UserTypeCreatorFactory schemaTypeUserTypeCreatorFactory,
+ FieldValueTypeInformationFactory fieldValueTypeInformationFactory) {
+ this.clazz = clazz;
+ this.schemaTypeCreatorFactory = new
CachingFactory<>(schemaTypeUserTypeCreatorFactory);
+ this.fieldValueTypeInformationFactory = new
CachingFactory<>(fieldValueTypeInformationFactory);
+ }
+
+ @Override
+ public T apply(Row row) {
+ return fromRow(row, clazz, fieldValueTypeInformationFactory);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <ValueT> ValueT fromRow(
+ Row row, Class<ValueT> clazz, Factory<List<FieldValueTypeInformation>>
typeFactory) {
+ if (row instanceof RowWithGetters) {
+ // Efficient path: simply extract the underlying object instead of
creating a new one.
+ return (ValueT) ((RowWithGetters) row).getGetterTarget();
+ }
+
+ Object[] params = new Object[row.getFieldCount()];
+ Schema schema = row.getSchema();
+ List<FieldValueTypeInformation> typeInformations =
typeFactory.create(clazz, schema);
+ checkState(
+ typeInformations.size() == row.getFieldCount(),
+ "Did not have a matching number of type informations and fields.");
+
+ for (int i = 0; i < row.getFieldCount(); ++i) {
+ FieldType type = schema.getField(i).getType();
+ FieldValueTypeInformation typeInformation = typeInformations.get(i);
+ params[i] =
+ fromValue(
+ type,
+ row.getValue(i),
+ typeInformation.getType(),
+ typeInformation.getElementType(),
+ typeInformation.getMapKeyType(),
+ typeInformation.getMapValueType(),
+ typeFactory);
+ }
+
+ SchemaUserTypeCreator creator = schemaTypeCreatorFactory.create(clazz,
schema);
+ return (ValueT) creator.create(params);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Nullable
+ private <ValueT> ValueT fromValue(
+ FieldType type,
+ ValueT value,
+ Type fieldType,
+ Type elemenentType,
+ Type keyType,
+ Type valueType,
+ Factory<List<FieldValueTypeInformation>> typeFactory) {
+ if (value == null) {
+ return null;
+ }
+ if (TypeName.ROW.equals(type.getTypeName())) {
+ return (ValueT) fromRow((Row) value, (Class) fieldType, typeFactory);
+ } else if (TypeName.ARRAY.equals(type.getTypeName())) {
+ return (ValueT)
+ fromListValue(type.getCollectionElementType(), (List) value,
elemenentType, typeFactory);
+ } else if (TypeName.MAP.equals(type.getTypeName())) {
+ return (ValueT)
+ fromMapValue(
+ type.getMapKeyType(),
+ type.getMapValueType(),
+ (Map) value,
+ keyType,
+ valueType,
+ typeFactory);
+ } else {
+ return value;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <ElementT> List fromListValue(
+ FieldType elementType,
+ List<ElementT> rowList,
+ Type elementClass,
+ Factory<List<FieldValueTypeInformation>> typeFactory) {
+ List list = Lists.newArrayList();
+ for (ElementT element : rowList) {
+ list.add(fromValue(elementType, element, elementClass, null, null, null,
typeFactory));
+ }
+ return list;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<?, ?> fromMapValue(
+ FieldType keyType,
+ FieldType valueType,
+ Map<?, ?> map,
+ Type keyClass,
+ Type valueClass,
+ Factory<List<FieldValueTypeInformation>> typeFactory) {
+ Map newMap = Maps.newHashMap();
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ Object key = fromValue(keyType, entry.getKey(), keyClass, null, null,
null, typeFactory);
+ Object value =
+ fromValue(valueType, entry.getValue(), valueClass, null, null, null,
typeFactory);
+ newMap.put(key, value);
+ }
+ return newMap;
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
index 2e147042d348..af5207275f9b 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
@@ -17,23 +17,12 @@
*/
package org.apache.beam.sdk.schemas;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Type;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.RowWithGetters;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
@@ -48,6 +37,43 @@
/** Implementing class should override to return a setter factory. */
abstract FieldValueSetterFactory fieldValueSetterFactory();
+ /** Implementing class should override to return a type-information factory.
*/
+ abstract FieldValueTypeInformationFactory fieldValueTypeInformationFactory();
+
+ /**
+ * Implementing class should override to return a constructor factory.
+ *
+ * <p>Tne default factory uses the default constructor and the setters to
construct an object.
+ */
+ UserTypeCreatorFactory schemaTypeCreatorFactory() {
+ Factory<List<FieldValueSetter>> setterFactory = new
CachingFactory<>(fieldValueSetterFactory());
+ return new UserTypeCreatorFactory() {
+ @Override
+ public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
+ List<FieldValueSetter> setters = setterFactory.create(clazz, schema);
+ return new SchemaUserTypeCreator() {
+ @Override
+ public Object create(Object... params) {
+ Object object;
+ try {
+ object = clazz.getDeclaredConstructor().newInstance();
+ } catch (NoSuchMethodException
+ | IllegalAccessException
+ | InvocationTargetException
+ | InstantiationException e) {
+ throw new RuntimeException("Failed to instantiate object ", e);
+ }
+ for (int i = 0; i < params.length; ++i) {
+ FieldValueSetter setter = setters.get(i);
+ setter.set(object, params[i]);
+ }
+ return object;
+ }
+ };
+ }
+ };
+ }
+
@Override
public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T>
typeDescriptor) {
// schemaFor is non deterministic - it might return fields in an arbitrary
order. The reason
@@ -61,163 +87,15 @@
// Since we know that this factory is always called from inside the lambda
with the same schema,
// return a caching factory that caches the first value seen for each
class. This prevents
// having to lookup the getter list each time createGetters is called.
- FieldValueGetterFactory getterFactory =
- new FieldValueGetterFactory() {
- @Nullable
- private transient ConcurrentHashMap<Class, List<FieldValueGetter>>
gettersMap = null;
-
- private final FieldValueGetterFactory innerFactory =
fieldValueGetterFactory();
-
- @Override
- public List<FieldValueGetter> createGetters(Class<?> targetClass,
Schema schema) {
- if (gettersMap == null) {
- gettersMap = new ConcurrentHashMap<>();
- }
- List<FieldValueGetter> getters = gettersMap.get(targetClass);
- if (getters != null) {
- return getters;
- }
- getters = innerFactory.createGetters(targetClass, schema);
- gettersMap.put(targetClass, getters);
- return getters;
- }
- };
+ Factory<List<FieldValueGetter>> getterFactory = new
CachingFactory<>(fieldValueGetterFactory());
return o -> Row.withSchema(schema).withFieldValueGetters(getterFactory,
o).build();
}
@Override
@SuppressWarnings("unchecked")
public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T>
typeDescriptor) {
- FieldValueSetterFactory setterFactory =
- new FieldValueSetterFactory() {
- @Nullable
- private volatile ConcurrentHashMap<Class, List<FieldValueSetter>>
settersMap = null;
-
- private final FieldValueSetterFactory innerFactory =
fieldValueSetterFactory();
-
- @Override
- public List<FieldValueSetter> createSetters(Class<?> targetClass,
Schema schema) {
- if (settersMap == null) {
- settersMap = new ConcurrentHashMap<>();
- }
- List<FieldValueSetter> setters = settersMap.get(targetClass);
- if (setters != null) {
- return setters;
- }
- setters = innerFactory.createSetters(targetClass, schema);
- settersMap.put(targetClass, setters);
- return setters;
- }
- };
-
- return r -> {
- if (r instanceof RowWithGetters) {
- // Efficient path: simply extract the underlying POJO instead of
creating a new one.
- return (T) ((RowWithGetters) r).getGetterTarget();
- } else {
- // Use the setters to copy values from the Row to a new instance of
the class.
- return fromRow(r, (Class<T>) typeDescriptor.getType(), setterFactory);
- }
- };
- }
-
- private <T> T fromRow(Row row, Class<T> clazz, FieldValueSetterFactory
setterFactory) {
- T object;
- try {
- object = clazz.getDeclaredConstructor().newInstance();
- } catch (NoSuchMethodException
- | IllegalAccessException
- | InvocationTargetException
- | InstantiationException e) {
- throw new RuntimeException("Failed to instantiate object ", e);
- }
-
- Schema schema = row.getSchema();
- List<FieldValueSetter> setters = setterFactory.createSetters(clazz,
schema);
- checkState(
- setters.size() == row.getFieldCount(),
- "Did not have a matching number of setters and fields.");
-
- // Iterate over the row, and set (possibly recursively) each field in the
underlying object
- // using the setter.
- for (int i = 0; i < row.getFieldCount(); ++i) {
- FieldType type = schema.getField(i).getType();
- FieldValueSetter setter = setters.get(i);
- setter.set(
- object,
- fromValue(
- type,
- row.getValue(i),
- setter.type(),
- setter.elementType(),
- setter.mapKeyType(),
- setter.mapValueType(),
- setterFactory));
- }
- return object;
- }
-
- @SuppressWarnings("unchecked")
- @Nullable
- private <T> T fromValue(
- FieldType type,
- T value,
- Type fieldType,
- Type elemenentType,
- Type keyType,
- Type valueType,
- FieldValueSetterFactory setterFactory) {
- if (value == null) {
- return null;
- }
- if (TypeName.ROW.equals(type.getTypeName())) {
- return (T) fromRow((Row) value, (Class) fieldType, setterFactory);
- } else if (TypeName.ARRAY.equals(type.getTypeName())) {
- return (T)
- fromListValue(
- type.getCollectionElementType(), (List) value, elemenentType,
setterFactory);
- } else if (TypeName.MAP.equals(type.getTypeName())) {
- return (T)
- fromMapValue(
- type.getMapKeyType(),
- type.getMapValueType(),
- (Map) value,
- keyType,
- valueType,
- setterFactory);
- } else {
- return value;
- }
- }
-
- @SuppressWarnings("unchecked")
- private <T> List fromListValue(
- FieldType elementType,
- List<T> rowList,
- Type elementClass,
- FieldValueSetterFactory setterFactory) {
- List list = Lists.newArrayList();
- for (T element : rowList) {
- list.add(fromValue(elementType, element, elementClass, null, null, null,
setterFactory));
- }
- return list;
- }
-
- @SuppressWarnings("unchecked")
- private Map<?, ?> fromMapValue(
- FieldType keyType,
- FieldType valueType,
- Map<?, ?> map,
- Type keyClass,
- Type valueClass,
- FieldValueSetterFactory setterFactory) {
- Map newMap = Maps.newHashMap();
- for (Map.Entry<?, ?> entry : map.entrySet()) {
- Object key = fromValue(keyType, entry.getKey(), keyClass, null, null,
null, setterFactory);
- Object value =
- fromValue(valueType, entry.getValue(), valueClass, null, null, null,
setterFactory);
- newMap.put(key, value);
- }
- return newMap;
+ Class<T> clazz = (Class<T>) typeDescriptor.getType();
+ return new FromRowUsingCreator<>(
+ clazz, schemaTypeCreatorFactory(), fieldValueTypeInformationFactory());
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
index cf6d9643750a..1e127b181210 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -21,6 +21,7 @@
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.utils.JavaBeanGetterFactory;
import org.apache.beam.sdk.schemas.utils.JavaBeanSetterFactory;
+import org.apache.beam.sdk.schemas.utils.JavaBeanTypeInformationFactory;
import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -53,4 +54,9 @@ public FieldValueGetterFactory fieldValueGetterFactory() {
public FieldValueSetterFactory fieldValueSetterFactory() {
return new JavaBeanSetterFactory();
}
+
+ @Override
+ public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
+ return new JavaBeanTypeInformationFactory();
+ }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
index 7cff87f13cf5..c6080e6863e5 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -22,6 +22,7 @@
import org.apache.beam.sdk.schemas.utils.POJOUtils;
import org.apache.beam.sdk.schemas.utils.PojoValueGetterFactory;
import org.apache.beam.sdk.schemas.utils.PojoValueSetterFactory;
+import org.apache.beam.sdk.schemas.utils.PojoValueTypeInformationFactory;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
@@ -53,4 +54,14 @@ public FieldValueGetterFactory fieldValueGetterFactory() {
public FieldValueSetterFactory fieldValueSetterFactory() {
return new PojoValueSetterFactory();
}
+
+ @Override
+ public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
+ return new PojoValueTypeInformationFactory();
+ }
+
+ @Override
+ UserTypeCreatorFactory schemaTypeCreatorFactory() {
+ return new PojoTypeUserTypeCreatorFactory();
+ }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java
new file mode 100644
index 000000000000..6013f9467f5a
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.lang.reflect.Constructor;
+import org.apache.beam.sdk.schemas.utils.POJOUtils;
+
+/** Vends constructors for POJOs. */
+class PojoTypeUserTypeCreatorFactory implements UserTypeCreatorFactory {
+ @Override
+ public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
+ Constructor<?> constructor = POJOUtils.getConstructor(clazz, schema);
+ return new SchemaUserTypeConstructorCreator(clazz, constructor);
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
new file mode 100644
index 000000000000..c5fae19948e5
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/** And implementation of {@link SchemaUserTypeCreator} that uses a Java
constructor. */
+public class SchemaUserTypeConstructorCreator implements SchemaUserTypeCreator
{
+ private final Class<?> clazz;
+ private final transient Constructor<?> constructor;
+
+ SchemaUserTypeConstructorCreator(Class<?> clazz, Constructor<?> constructor)
{
+ this.clazz = clazz;
+ this.constructor = checkNotNull(constructor);
+ }
+
+ @Override
+ public Object create(Object... params) {
+ checkNotNull(constructor);
+ try {
+ return constructor.newInstance(params);
+ } catch (InstantiationException | IllegalAccessException |
InvocationTargetException e) {
+ throw new RuntimeException("Could not instantiate object " + clazz, e);
+ }
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeCreator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeCreator.java
new file mode 100644
index 000000000000..28f5bf8ed7e8
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeCreator.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Internal;
+
+/** A creator interface for user types that have schemas. */
+@Internal
+public interface SchemaUserTypeCreator extends Serializable {
+ Object create(Object... params);
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/UserTypeCreatorFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/UserTypeCreatorFactory.java
new file mode 100644
index 000000000000..1e4c902de2df
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/UserTypeCreatorFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+/** A factory for {@link SchemaUserTypeCreator} objects. */
+public interface UserTypeCreatorFactory extends Factory<SchemaUserTypeCreator>
{
+ @Override
+ SchemaUserTypeCreator create(Class<?> clazz, Schema schema);
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
index 18c0e174ada5..accbb20dbf7b 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
@@ -17,9 +17,6 @@
*/
package org.apache.beam.sdk.schemas.utils;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -27,13 +24,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
import net.bytebuddy.dynamic.DynamicType;
-import net.bytebuddy.implementation.FixedValue;
-import net.bytebuddy.implementation.Implementation;
import net.bytebuddy.implementation.bytecode.Duplication;
import net.bytebuddy.implementation.bytecode.StackManipulation;
import net.bytebuddy.implementation.bytecode.StackManipulation.Compound;
@@ -147,9 +141,16 @@ public T convert(TypeDescriptor typeDescriptor) {
* <pre><code>{@literal FieldValueGetter<POJO, List<Integer>>}</code></pre>
*/
static class ConvertType extends TypeConversion<Type> {
+ private boolean returnRawTypes;
+
+ public ConvertType(boolean returnRawTypes) {
+ this.returnRawTypes = returnRawTypes;
+ }
+
@Override
protected Type convertArray(TypeDescriptor<?> type) {
- return createListType(type).getType();
+ TypeDescriptor ret = createListType(type);
+ return returnRawTypes ? ret.getRawType() : ret.getType();
}
@Override
@@ -184,7 +185,7 @@ protected Type convertPrimitive(TypeDescriptor<?> type) {
@Override
protected Type convertDefault(TypeDescriptor<?> type) {
- return type.getType();
+ return returnRawTypes ? type.getRawType() : type.getType();
}
@SuppressWarnings("unchecked")
@@ -507,55 +508,4 @@ protected StackManipulation
convertDefault(TypeDescriptor<?> type) {
return readValue;
}
}
-
- // If the Field is a container type, returns the element type. Otherwise
returns a null reference.
- @SuppressWarnings("unchecked")
- static Implementation getArrayComponentType(TypeDescriptor valueType) {
- if (valueType.isArray()) {
- Type component = valueType.getComponentType().getType();
- if (!component.equals(byte.class)) {
- return FixedValue.reference(component);
- }
- } else if (valueType.isSubtypeOf(TypeDescriptor.of(Collection.class))) {
- TypeDescriptor<Collection<?>> collection =
valueType.getSupertype(Collection.class);
- if (collection.getType() instanceof ParameterizedType) {
- ParameterizedType ptype = (ParameterizedType) collection.getType();
- java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
- checkArgument(params.length == 1);
- return FixedValue.reference(params[0]);
- } else {
- throw new RuntimeException("Collection parameter is not
parameterized!");
- }
- }
- return FixedValue.nullValue();
- }
-
- // If the Field is a map type, returns the key type, otherwise returns a
null reference.
- @Nullable
- static Implementation getMapKeyType(TypeDescriptor valueType) {
- return getMapType(valueType, 0);
- }
-
- // If the Field is a map type, returns the value type, otherwise returns a
null reference.
- @Nullable
- static Implementation getMapValueType(TypeDescriptor valueType) {
- return getMapType(valueType, 1);
- }
-
- // If the Field is a map type, returns the key or value type (0 is key type,
1 is value).
- // Otherwise returns a null reference.
- @SuppressWarnings("unchecked")
- private static Implementation getMapType(TypeDescriptor valueType, int
index) {
- if (valueType.isSubtypeOf(TypeDescriptor.of(Map.class))) {
- TypeDescriptor<Collection<?>> map = valueType.getSupertype(Map.class);
- if (map.getType() instanceof ParameterizedType) {
- ParameterizedType ptype = (ParameterizedType) map.getType();
- java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
- return FixedValue.reference(params[index]);
- } else {
- throw new RuntimeException("Map type is not parameterized! " + map);
- }
- }
- return FixedValue.nullValue();
- }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
index e6edc330cee1..0bb9e995cd71 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
@@ -25,7 +25,7 @@
/** A factory for creating {@link FieldValueGetter} objects for a JavaBean
object. */
public class JavaBeanGetterFactory implements FieldValueGetterFactory {
@Override
- public List<FieldValueGetter> createGetters(Class<?> targetClass, Schema
schema) {
+ public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
return JavaBeanUtils.getGetters(targetClass, schema);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java
index b9ac24d38571..e67a58dc6837 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java
@@ -25,7 +25,7 @@
/** A factory for creating {@link FieldValueSetter} objects for a JavaBean
object. */
public class JavaBeanSetterFactory implements FieldValueSetterFactory {
@Override
- public List<FieldValueSetter> createSetters(Class<?> targetClass, Schema
schema) {
+ public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
return JavaBeanUtils.getSetters(targetClass, schema);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
new file mode 100644
index 000000000000..1b3826227c8a
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** A {@link FieldValueTypeInformationFactory} for Java Bean objects. */
+public class JavaBeanTypeInformationFactory implements
FieldValueTypeInformationFactory {
+ @Override
+ public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema
schema) {
+ return JavaBeanUtils.getFieldTypes(targetClass, schema);
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index 54747a879e13..8a213b436eba 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -43,6 +43,7 @@
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.FieldValueGetter;
import org.apache.beam.sdk.schemas.FieldValueSetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
@@ -109,6 +110,33 @@ private static void validateJavaBean(
// Static ByteBuddy instance used by all helpers.
private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
+ private static final Map<ClassWithSchema, List<FieldValueTypeInformation>>
CACHED_FIELD_TYPES =
+ Maps.newConcurrentMap();
+
+ public static List<FieldValueTypeInformation> getFieldTypes(Class<?> clazz,
Schema schema) {
+ return CACHED_FIELD_TYPES.computeIfAbsent(
+ new ClassWithSchema(clazz, schema),
+ c -> {
+ try {
+ Map<String, FieldValueTypeInformation> getterMap =
+ ReflectUtils.getMethods(clazz)
+ .stream()
+ .filter(ReflectUtils::isGetter)
+ .map(TypeInformation::forGetter)
+ .map(FieldValueTypeInformation::of)
+ .collect(
+ Collectors.toMap(FieldValueTypeInformation::getName,
Function.identity()));
+ return schema
+ .getFields()
+ .stream()
+ .map(f -> getterMap.get(f.getName()))
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
// The list of getters for a class is cached, so we only create the classes
the first time
// getSetters is called.
private static final Map<ClassWithSchema, List<FieldValueGetter>>
CACHED_GETTERS =
@@ -147,7 +175,7 @@ private static void validateJavaBean(
ByteBuddyUtils.subclassGetterInterface(
BYTE_BUDDY,
getterMethod.getDeclaringClass(),
- new ConvertType().convert(typeInformation.getType()));
+ new ConvertType(false).convert(typeInformation.getType()));
builder = implementGetterMethods(builder, getterMethod);
try {
return builder
@@ -170,8 +198,6 @@ private static void validateJavaBean(
return builder
.method(ElementMatchers.named("name"))
.intercept(FixedValue.reference(typeInformation.getName()))
- .method(ElementMatchers.named("type"))
-
.intercept(FixedValue.reference(typeInformation.getType().getRawType()))
.method(ElementMatchers.named("get"))
.intercept(new InvokeGetterInstruction(method));
}
@@ -214,7 +240,7 @@ private static void validateJavaBean(
ByteBuddyUtils.subclassSetterInterface(
BYTE_BUDDY,
setterMethod.getDeclaringClass(),
- new ConvertType().convert(typeInformation.getType()));
+ new ConvertType(false).convert(typeInformation.getType()));
builder = implementSetterMethods(builder, setterMethod);
try {
return builder
@@ -237,14 +263,6 @@ private static void validateJavaBean(
return builder
.method(ElementMatchers.named("name"))
.intercept(FixedValue.reference(typeInformation.getName()))
- .method(ElementMatchers.named("type"))
-
.intercept(FixedValue.reference(typeInformation.getType().getRawType()))
- .method(ElementMatchers.named("elementType"))
-
.intercept(ByteBuddyUtils.getArrayComponentType(typeInformation.getType()))
- .method(ElementMatchers.named("mapKeyType"))
- .intercept(ByteBuddyUtils.getMapKeyType(typeInformation.getType()))
- .method(ElementMatchers.named("mapValueType"))
- .intercept(ByteBuddyUtils.getMapValueType(typeInformation.getType()))
.method(ElementMatchers.named("set"))
.intercept(new InvokeSetterInstruction(method));
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
index f934a4d88bdc..ec87afc3dbff 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
@@ -17,22 +17,25 @@
*/
package org.apache.beam.sdk.schemas.utils;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.description.field.FieldDescription.ForLoadedField;
+import net.bytebuddy.description.modifier.Visibility;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.dynamic.scaffold.InstrumentedType;
+import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy.Default;
import net.bytebuddy.implementation.FixedValue;
import net.bytebuddy.implementation.Implementation;
+import net.bytebuddy.implementation.MethodCall;
import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
import net.bytebuddy.implementation.bytecode.ByteCodeAppender.Size;
import net.bytebuddy.implementation.bytecode.StackManipulation;
@@ -44,6 +47,7 @@
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.FieldValueGetter;
import org.apache.beam.sdk.schemas.FieldValueSetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
@@ -69,9 +73,30 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
// Static ByteBuddy instance used by all helpers.
private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
+ private static final Map<ClassWithSchema, List<FieldValueTypeInformation>>
CACHED_FIELD_TYPES =
+ Maps.newConcurrentMap();
+
+ public static List<FieldValueTypeInformation> getFieldTypes(Class<?> clazz,
Schema schema) {
+ return CACHED_FIELD_TYPES.computeIfAbsent(
+ new ClassWithSchema(clazz, schema),
+ c -> {
+ Map<String, FieldValueTypeInformation> typeInformationMap =
+ ReflectUtils.getFields(clazz)
+ .stream()
+ .map(FieldValueTypeInformation::of)
+ .collect(
+ Collectors.toMap(FieldValueTypeInformation::getName,
Function.identity()));
+ return schema
+ .getFields()
+ .stream()
+ .map(f -> typeInformationMap.get(f.getName()))
+ .collect(Collectors.toList());
+ });
+ }
+
// The list of getters for a class is cached, so we only create the classes
the first time
// getSetters is called.
- public static final Map<ClassWithSchema, List<FieldValueGetter>>
CACHED_GETTERS =
+ private static final Map<ClassWithSchema, List<FieldValueGetter>>
CACHED_GETTERS =
Maps.newConcurrentMap();
public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema
schema) {
@@ -92,6 +117,60 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
});
}
+ // The list of constructors for a class is cached, so we only create the
classes the first time
+ // getConstructor is called.
+ public static final Map<ClassWithSchema, Constructor> CACHED_CONSTRUCTORS =
+ Maps.newConcurrentMap();
+
+ public static <T> Constructor<? extends T> getConstructor(Class<T> clazz,
Schema schema) {
+ return CACHED_CONSTRUCTORS.computeIfAbsent(
+ new ClassWithSchema(clazz, schema), c -> createConstructor(clazz,
schema));
+ }
+
+ private static <T> Constructor<? extends T> createConstructor(Class<T>
clazz, Schema schema) {
+ // Get the list of class fields ordered by schema.
+ Map<String, Field> fieldMap =
+ ReflectUtils.getFields(clazz)
+ .stream()
+ .collect(Collectors.toMap(Field::getName, Function.identity()));
+ List<Field> fields =
+ schema
+ .getFields()
+ .stream()
+ .map(f -> fieldMap.get(f.getName()))
+ .collect(Collectors.toList());
+
+ List<Type> types =
+ fields
+ .stream()
+ .map(Field::getType)
+ .map(TypeDescriptor::of)
+ // We need raw types back so we can setup the list of constructor
params.
+ .map(new ConvertType(true)::convert)
+ .collect(Collectors.toList());
+
+ try {
+ DynamicType.Builder<? extends T> builder =
+ BYTE_BUDDY
+ .subclass(clazz, Default.NO_CONSTRUCTORS)
+ .defineConstructor(Visibility.PUBLIC)
+ .withParameters(types)
+ .intercept(
+ MethodCall.invoke(clazz.getDeclaredConstructor())
+ .andThen(new ConstructInstruction(fields)));
+
+ Class typeArray[] = types.toArray(new Class[types.size()]);
+ return builder
+ .make()
+ .load(ReflectHelpers.findClassLoader(),
ClassLoadingStrategy.Default.INJECTION)
+ .getLoaded()
+ .getDeclaredConstructor(typeArray);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(
+ "Unable to generate a getter for class " + clazz + " with schema " +
schema);
+ }
+ }
+
/**
* Generate the following {@link FieldValueSetter} class for the {@link
Field}.
*
@@ -111,7 +190,7 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
ByteBuddyUtils.subclassGetterInterface(
BYTE_BUDDY,
field.getDeclaringClass(),
- new ConvertType().convert(TypeDescriptor.of(field.getType())));
+ new
ConvertType(false).convert(TypeDescriptor.of(field.getType())));
builder = implementGetterMethods(builder, field);
try {
return builder
@@ -133,8 +212,6 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
return builder
.method(ElementMatchers.named("name"))
.intercept(FixedValue.reference(field.getName()))
- .method(ElementMatchers.named("type"))
- .intercept(FixedValue.reference(field.getType()))
.method(ElementMatchers.named("get"))
.intercept(new ReadFieldInstruction(field));
}
@@ -184,7 +261,7 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
ByteBuddyUtils.subclassSetterInterface(
BYTE_BUDDY,
field.getDeclaringClass(),
- new ConvertType().convert(TypeDescriptor.of(field.getType())));
+ new
ConvertType(false).convert(TypeDescriptor.of(field.getType())));
builder = implementSetterMethods(builder, field);
try {
return builder
@@ -206,14 +283,6 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
return builder
.method(ElementMatchers.named("name"))
.intercept(FixedValue.reference(field.getName()))
- .method(ElementMatchers.named("type"))
- .intercept(FixedValue.reference(checkNotNull(field.getType())))
- .method(ElementMatchers.named("elementType"))
-
.intercept(ByteBuddyUtils.getArrayComponentType(TypeDescriptor.of(field.getGenericType())))
- .method(ElementMatchers.named("mapKeyType"))
-
.intercept(ByteBuddyUtils.getMapKeyType(TypeDescriptor.of(field.getGenericType())))
- .method(ElementMatchers.named("mapValueType"))
-
.intercept(ByteBuddyUtils.getMapValueType(TypeDescriptor.of(field.getGenericType())))
.method(ElementMatchers.named("set"))
.intercept(new SetFieldInstruction(field));
}
@@ -297,4 +366,56 @@ public ByteCodeAppender appender(final Target
implementationTarget) {
};
}
}
+
+ // Implements a method to construct an object.
+ static class ConstructInstruction implements Implementation {
+ private final List<Field> fields;
+
+ ConstructInstruction(List<Field> fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public InstrumentedType prepare(InstrumentedType instrumentedType) {
+ return instrumentedType;
+ }
+
+ @Override
+ public ByteCodeAppender appender(final Target implementationTarget) {
+ return (methodVisitor, implementationContext, instrumentedMethod) -> {
+ // this + method parameters.
+ int numLocals = 1 + instrumentedMethod.getParameters().size();
+
+ // Generate code to initialize all member variables.
+ StackManipulation stackManipulation = null;
+ for (int i = 0; i < fields.size(); ++i) {
+ Field field = fields.get(i);
+ // The instruction to read the field.
+ StackManipulation readField =
MethodVariableAccess.REFERENCE.loadFrom(i + 1);
+
+ // Read the object onto the stack.
+ StackManipulation updateField =
+ new StackManipulation.Compound(
+ // This param is offset 0.
+ MethodVariableAccess.REFERENCE.loadFrom(0),
+ // Do any conversions necessary.
+ new ByteBuddyUtils.ConvertValueForSetter(readField)
+ .convert(TypeDescriptor.of(field.getType())),
+ // Now update the field and return void.
+ FieldAccess.forField(new ForLoadedField(field)).write());
+ stackManipulation =
+ (stackManipulation == null)
+ ? updateField
+ : new StackManipulation.Compound(stackManipulation,
updateField);
+ }
+ stackManipulation =
+ (stackManipulation == null)
+ ? MethodReturn.VOID
+ : new StackManipulation.Compound(stackManipulation,
MethodReturn.VOID);
+
+ StackManipulation.Size size = stackManipulation.apply(methodVisitor,
implementationContext);
+ return new Size(size.getMaximalSize(), numLocals);
+ };
+ }
+ }
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java
index 03f168454109..275b791318f4 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java
@@ -25,7 +25,7 @@
/** A factory for creating {@link FieldValueGetter} objects for a POJO. */
public class PojoValueGetterFactory implements FieldValueGetterFactory {
@Override
- public List<FieldValueGetter> createGetters(Class<?> targetClass, Schema
schema) {
+ public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
return POJOUtils.getGetters(targetClass, schema);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java
index e2437d56812e..5e9447a2e04a 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java
@@ -25,7 +25,7 @@
/** A factory for creating {@link FieldValueSetter} objects for a POJO. */
public class PojoValueSetterFactory implements FieldValueSetterFactory {
@Override
- public List<FieldValueSetter> createSetters(Class<?> targetClass, Schema
schema) {
+ public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
return POJOUtils.getSetters(targetClass, schema);
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java
new file mode 100644
index 000000000000..84f9a5e050d6
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** A {@link FieldValueTypeInformationFactory} for POJO objects objects. */
+public class PojoValueTypeInformationFactory implements
FieldValueTypeInformationFactory {
+ @Override
+ public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema
schema) {
+ return POJOUtils.getFieldTypes(targetClass, schema);
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 23256a373b03..ef8d2bd4b90a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -36,7 +36,8 @@
import java.util.stream.Collector;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.schemas.FieldValueGetterFactory;
+import org.apache.beam.sdk.schemas.Factory;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
@@ -486,7 +487,7 @@ public static Builder withSchema(Schema schema) {
public static class Builder {
private List<Object> values = Lists.newArrayList();
private boolean attached = false;
- @Nullable private FieldValueGetterFactory fieldValueGetterFactory;
+ @Nullable private Factory<List<FieldValueGetter>> fieldValueGetterFactory;
@Nullable private Object getterTarget;
private Schema schema;
@@ -525,7 +526,7 @@ public Builder attachValues(List<Object> values) {
}
public Builder withFieldValueGetters(
- FieldValueGetterFactory fieldValueGetterFactory, Object getterTarget) {
+ Factory<List<FieldValueGetter>> fieldValueGetterFactory, Object
getterTarget) {
this.fieldValueGetterFactory = fieldValueGetterFactory;
this.getterTarget = getterTarget;
return this;
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
index c0ea2026474b..4754f2ced89c 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
@@ -24,8 +24,8 @@
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Factory;
import org.apache.beam.sdk.schemas.FieldValueGetter;
-import org.apache.beam.sdk.schemas.FieldValueGetterFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -39,18 +39,19 @@
* the appropriate fields from the POJO.
*/
public class RowWithGetters extends Row {
- private final FieldValueGetterFactory fieldValueGetterFactory;
+ private final Factory<List<FieldValueGetter>> fieldValueGetterFactory;
private final Object getterTarget;
private final List<FieldValueGetter> getters;
private final Map<Integer, List> cachedLists = Maps.newHashMap();
private final Map<Integer, Map> cachedMaps = Maps.newHashMap();
- RowWithGetters(Schema schema, FieldValueGetterFactory getterFactory, Object
getterTarget) {
+ RowWithGetters(
+ Schema schema, Factory<List<FieldValueGetter>> getterFactory, Object
getterTarget) {
super(schema);
this.fieldValueGetterFactory = getterFactory;
this.getterTarget = getterTarget;
- this.getters =
fieldValueGetterFactory.createGetters(getterTarget.getClass(), schema);
+ this.getters = fieldValueGetterFactory.create(getterTarget.getClass(),
schema);
}
@Nullable
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 171420)
Time Spent: 12.5h (was: 12h 20m)
> Provide automatic schema registration for POJOs
> -----------------------------------------------
>
> Key: BEAM-4453
> URL: https://issues.apache.org/jira/browse/BEAM-4453
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core
> Reporter: Reuven Lax
> Assignee: Reuven Lax
> Priority: Major
> Fix For: 2.6.0
>
> Time Spent: 12.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)