[
https://issues.apache.org/jira/browse/BEAM-11338?focusedWorklogId=522789&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-522789
]
ASF GitHub Bot logged work on BEAM-11338:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Dec/20 16:46
Start Date: 10/Dec/20 16:46
Worklog Time Spent: 10m
Work Description: ccciudatu commented on a change in pull request #13428:
URL: https://github.com/apache/beam/pull/13428#discussion_r540269432
##########
File path:
sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.thrift;
+
+import static java.util.Collections.unmodifiableMap;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Builder;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TEnum;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.TUnion;
+import org.apache.thrift.meta_data.EnumMetaData;
+import org.apache.thrift.meta_data.FieldMetaData;
+import org.apache.thrift.meta_data.FieldValueMetaData;
+import org.apache.thrift.meta_data.ListMetaData;
+import org.apache.thrift.meta_data.MapMetaData;
+import org.apache.thrift.meta_data.SetMetaData;
+import org.apache.thrift.meta_data.StructMetaData;
+import org.apache.thrift.protocol.TType;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Schema provider for generated thrift types.
+ *
+ * <ul>
+ * <li>primitive type mapping is straight-forward (e.g. {@link TType#I32} ->
{@link
+ * FieldType#INT32})
+ * <li>{@link TType#STRING} gets mapped as either {@link FieldType#STRING}
or {@link
+ * FieldType#BYTES}, depending on whether the {@link
FieldValueMetaData#isBinary()} flag is
+ * set.
+ * <li>{@link TType#MAP} becomes {@link FieldType#map(FieldType, FieldType)
a beam map} passing
+ * the key and value types recursively
+ * <li>{@link TType#SET} gets translated into a beam {@link
FieldType#iterable(FieldType)
+ * iterable}, passing the corresponding element type
+ * <li>{@link TType#LIST} becomes an {@link FieldType#array(FieldType)
array} of the corresponding
+ * element type
+ * <li>{@link TType#ENUM thrift enums} are converted into {@link
EnumerationType beam enumeration
+ * types}
+ * <li>{@link TUnion thrift union} types get mapped to {@link OneOfType beam
one-of} types
+ * </ul>
+ *
+ * <p>The mapping logic relies on the available {@link FieldMetaData thrift
metadata} introspection
+ * and tries to make as few assumptions about the generated code as possible
(i.e. does not rely on
+ * accessor naming convention, as the thrift compiler supports options such as
"beans" or
+ * "fullcamel"/"nocamel".<br>
+ * However, the following strong assumptions are made by this class:
+ *
+ * <ul>
+ * <li>all thrift generated classes implement {@link TBase}, except for
enums which become {@link
+ * Enum java enums} implementing {@link TEnum}
+ * <li>all {@link TUnion} types provide static factory methods for each of
the supported field
+ * types, with the same name as the field itself and only one such
method taking a single
+ * parameter exists.
+ * <li>all non-union types have a corresponding java field with the same
name for every field in
+ * the original thrift source file
+ * <li>the underlying {@link FieldMetaData#getStructMetaDataMap(Class)
metadata maps} are {@link
+ * java.util.EnumMap enum maps}, so the natural order of the field keys
is preserved
+ * </ul>
+ *
+ * <p>Thrift typedefs for container types (and possibly others) do not
preserve the full type
+ * information. For this reason, this class allows for {@link #custom() manual
registration} of such
+ * "lossy" typedefs with their corresponding beam types.
+ */
+@Experimental(Experimental.Kind.SCHEMAS)
+public final class ThriftSchema extends GetterBasedSchemaProvider {
+ private static final ThriftSchema defaultProvider = new
ThriftSchema(Collections.emptyMap());
+
+ private final Map<String, FieldType> typedefs;
+
+ private ThriftSchema(Map<String, FieldType> typedefs) {
+ this.typedefs = typedefs;
+ }
+
+ /**
+ * Schema provider that maps any thrift type to a Beam schema, assuming that
any typedefs that
+ * might have been used in the thrift definitions will preserve all required
metadata to infer the
+ * beam type (which is the case for any primitive typedefs and alike).
+ *
+ * @see #custom() for how to manually pass the beam type for container
typedefs
+ */
+ public static @NonNull SchemaProvider provider() {
+ return defaultProvider;
+ }
+
+ /**
+ * Builds a schema provider that maps any thrift type to a Beam schema,
allowing for custom thrift
+ * typedef entries (which cannot be resolved using the available metadata)
to be manually
+ * registered with their corresponding beam types.
+ *
+ * <p>E.g. {@code typedef set<string> StringSet} will not carry the element
type information and
+ * needs to be manually mapped here as {@code
.custom().withTypedef("StringSet",
+ * FieldType.iterable(FieldType.STRING)).provider()}.
+ */
+ public static @NonNull Customizer custom() {
+ return new Customizer();
+ }
+
+ public static final class Customizer {
+ private final Map<String, FieldType> typedefs = new HashMap<>();
+
+ private Customizer() {}
+
+ public @NonNull Customizer typedef(
+ @NonNull String thriftTypedefName, @NonNull FieldType beamType) {
+ typedefs.put(thriftTypedefName, beamType);
+ return this;
+ }
+
+ public @NonNull SchemaProvider provider() {
+ if (typedefs.isEmpty()) {
+ return defaultProvider;
+ } else {
+ return new ThriftSchema(unmodifiableMap(new HashMap<>(typedefs)));
+ }
+ }
+ }
+
+ @Override
+ public <T> @NonNull Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
+ return schemaFor(typeDescriptor.getRawType());
+ }
+
+ private Schema schemaFor(Class<?> targetClass) {
+ if (!TBase.class.isAssignableFrom(targetClass)) {
+ throw new IllegalArgumentException("Expected thrift class but got: " +
targetClass);
+ }
+ final Stream<Schema.Field> fields =
+
thriftFieldDescriptors(targetClass).values().stream().map(this::beamField);
+ if (TUnion.class.isAssignableFrom(targetClass)) {
+ return
OneOfType.create(fields.collect(Collectors.toList())).getOneOfSchema();
+ } else {
+ return fields
+ .reduce(Schema.builder(), Builder::addField,
ThriftSchema::throwingCombiner)
+ .build();
+ }
+ }
+
+ private static <X> X throwingCombiner(X lhs, X rhs) {
+ throw new IllegalStateException();
+ }
+
+ private Schema.Field beamField(FieldMetaData fieldDescriptor) {
+ try {
+ final FieldType type = beamType(fieldDescriptor.valueMetaData);
+ return Schema.Field.nullable(fieldDescriptor.fieldName, type);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Could not infer beam type for thrift field: " +
fieldDescriptor.fieldName, e);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public @NonNull List<FieldValueGetter> fieldValueGetters(
+ @NonNull Class<?> targetClass, @NonNull Schema schema) {
+ return thriftFieldDescriptors(targetClass).keySet().stream()
+ .map(FieldExtractor::new)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public @NonNull List<FieldValueTypeInformation> fieldValueTypeInformations(
+ @NonNull Class<?> targetClass, @NonNull Schema schema) {
+ return thriftFieldDescriptors(targetClass).values().stream()
+ .map(descriptor -> fieldValueTypeInfo(targetClass,
descriptor.fieldName))
+ .collect(Collectors.toList());
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <FieldT extends Enum<FieldT> & TFieldIdEnum, T extends
TBase<T, FieldT>>
+ Map<FieldT, FieldMetaData> thriftFieldDescriptors(Class<?> targetClass) {
+ return (Map<FieldT, FieldMetaData>)
FieldMetaData.getStructMetaDataMap((Class<T>) targetClass);
+ }
+
+ private FieldValueTypeInformation fieldValueTypeInfo(Class<?> type, String
fieldName) {
+ if (TUnion.class.isAssignableFrom(type)) {
+ final List<Method> factoryMethods =
+ Stream.of(type.getDeclaredMethods())
+ .filter(m -> m.getName().equals(fieldName))
+ .filter(m -> m.getModifiers() == (Modifier.PUBLIC |
Modifier.STATIC))
+ .filter(m -> m.getParameterCount() == 1)
+ .filter(m -> m.getReturnType() == type)
+ .collect(Collectors.toList());
+ if (factoryMethods.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "No suitable static factory method: %s.%s(...)",
type.getName(), fieldName));
+ }
+ if (factoryMethods.size() > 1) {
+ throw new IllegalStateException("Overloaded factory methods: " +
factoryMethods);
Review comment:
This should never happen with generated code, so we don't need to bother
keeping track /inferring of the actual parameter type in order to choose the
proper factory method here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 522789)
Time Spent: 2.5h (was: 2h 20m)
> Beam schema for thrift data
> ---------------------------
>
> Key: BEAM-11338
> URL: https://issues.apache.org/jira/browse/BEAM-11338
> Project: Beam
> Issue Type: New Feature
> Components: io-java-utilities
> Reporter: Costi Ciudatu
> Priority: P2
> Labels: thrift
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> Define a SchemaProvider that can handle thrift objects, the same way as
> ProtoMessageSchema handles protobuf.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)