ccciudatu commented on a change in pull request #13428:
URL: https://github.com/apache/beam/pull/13428#discussion_r540269432



##########
File path: 
sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.thrift;
+
+import static java.util.Collections.unmodifiableMap;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.GetterBasedSchemaProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Builder;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TEnum;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.TUnion;
+import org.apache.thrift.meta_data.EnumMetaData;
+import org.apache.thrift.meta_data.FieldMetaData;
+import org.apache.thrift.meta_data.FieldValueMetaData;
+import org.apache.thrift.meta_data.ListMetaData;
+import org.apache.thrift.meta_data.MapMetaData;
+import org.apache.thrift.meta_data.SetMetaData;
+import org.apache.thrift.meta_data.StructMetaData;
+import org.apache.thrift.protocol.TType;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Schema provider for generated thrift types.
+ *
+ * <ul>
+ *   <li>primitive type mapping is straight-forward (e.g. {@link TType#I32} -> 
{@link
+ *       FieldType#INT32})
+ *   <li>{@link TType#STRING} gets mapped as either {@link FieldType#STRING} 
or {@link
+ *       FieldType#BYTES}, depending on whether the {@link 
FieldValueMetaData#isBinary()} flag is
+ *       set.
+ *   <li>{@link TType#MAP} becomes {@link FieldType#map(FieldType, FieldType) 
a beam map} passing
+ *       the key and value types recursively
+ *   <li>{@link TType#SET} gets translated into a beam {@link 
FieldType#iterable(FieldType)
+ *       iterable}, passing the corresponding element type
+ *   <li>{@link TType#LIST} becomes an {@link FieldType#array(FieldType) 
array} of the corresponding
+ *       element type
+ *   <li>{@link TType#ENUM thrift enums} are converted into {@link 
EnumerationType beam enumeration
+ *       types}
+ *   <li>{@link TUnion thrift union} types get mapped to {@link OneOfType beam 
one-of} types
+ * </ul>
+ *
+ * <p>The mapping logic relies on the available {@link FieldMetaData thrift 
metadata} introspection
+ * and tries to make as few assumptions about the generated code as possible 
(i.e. does not rely on
+ * accessor naming convention, as the thrift compiler supports options such as 
"beans" or
+ * "fullcamel"/"nocamel".<br>
+ * However, the following strong assumptions are made by this class:
+ *
+ * <ul>
+ *   <li>all thrift generated classes implement {@link TBase}, except for 
enums which become {@link
+ *       Enum java enums} implementing {@link TEnum}
+ *   <li>all {@link TUnion} types provide static factory methods for each of 
the supported field
+ *       types, with the same name as the field itself and only one such 
method taking a single
+ *       parameter exists.
+ *   <li>all non-union types have a corresponding java field with the same 
name for every field in
+ *       the original thrift source file
+ *   <li>the underlying {@link FieldMetaData#getStructMetaDataMap(Class) 
metadata maps} are {@link
+ *       java.util.EnumMap enum maps}, so the natural order of the field keys 
is preserved
+ * </ul>
+ *
+ * <p>Thrift typedefs for container types (and possibly others) do not 
preserve the full type
+ * information. For this reason, this class allows for {@link #custom() manual 
registration} of such
+ * "lossy" typedefs with their corresponding beam types.
+ */
+@Experimental(Experimental.Kind.SCHEMAS)
+public final class ThriftSchema extends GetterBasedSchemaProvider {
+  private static final ThriftSchema defaultProvider = new 
ThriftSchema(Collections.emptyMap());
+
+  private final Map<String, FieldType> typedefs;
+
+  private ThriftSchema(Map<String, FieldType> typedefs) {
+    this.typedefs = typedefs;
+  }
+
+  /**
+   * Schema provider that maps any thrift type to a Beam schema, assuming that 
any typedefs that
+   * might have been used in the thrift definitions will preserve all required 
metadata to infer the
+   * beam type (which is the case for any primitive typedefs and alike).
+   *
+   * @see #custom() for how to manually pass the beam type for container 
typedefs
+   */
+  public static @NonNull SchemaProvider provider() {
+    return defaultProvider;
+  }
+
+  /**
+   * Builds a schema provider that maps any thrift type to a Beam schema, 
allowing for custom thrift
+   * typedef entries (which cannot be resolved using the available metadata) 
to be manually
+   * registered with their corresponding beam types.
+   *
+   * <p>E.g. {@code typedef set<string> StringSet} will not carry the element 
type information and
+   * needs to be manually mapped here as {@code 
.custom().withTypedef("StringSet",
+   * FieldType.iterable(FieldType.STRING)).provider()}.
+   */
+  public static @NonNull Customizer custom() {
+    return new Customizer();
+  }
+
+  public static final class Customizer {
+    private final Map<String, FieldType> typedefs = new HashMap<>();
+
+    private Customizer() {}
+
+    public @NonNull Customizer typedef(
+        @NonNull String thriftTypedefName, @NonNull FieldType beamType) {
+      typedefs.put(thriftTypedefName, beamType);
+      return this;
+    }
+
+    public @NonNull SchemaProvider provider() {
+      if (typedefs.isEmpty()) {
+        return defaultProvider;
+      } else {
+        return new ThriftSchema(unmodifiableMap(new HashMap<>(typedefs)));
+      }
+    }
+  }
+
+  @Override
+  public <T> @NonNull Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
+    return schemaFor(typeDescriptor.getRawType());
+  }
+
+  private Schema schemaFor(Class<?> targetClass) {
+    if (!TBase.class.isAssignableFrom(targetClass)) {
+      throw new IllegalArgumentException("Expected thrift class but got: " + 
targetClass);
+    }
+    final Stream<Schema.Field> fields =
+        
thriftFieldDescriptors(targetClass).values().stream().map(this::beamField);
+    if (TUnion.class.isAssignableFrom(targetClass)) {
+      return 
OneOfType.create(fields.collect(Collectors.toList())).getOneOfSchema();
+    } else {
+      return fields
+          .reduce(Schema.builder(), Builder::addField, 
ThriftSchema::throwingCombiner)
+          .build();
+    }
+  }
+
+  private static <X> X throwingCombiner(X lhs, X rhs) {
+    throw new IllegalStateException();
+  }
+
+  private Schema.Field beamField(FieldMetaData fieldDescriptor) {
+    try {
+      final FieldType type = beamType(fieldDescriptor.valueMetaData);
+      return Schema.Field.nullable(fieldDescriptor.fieldName, type);
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          "Could not infer beam type for thrift field: " + 
fieldDescriptor.fieldName, e);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public @NonNull List<FieldValueGetter> fieldValueGetters(
+      @NonNull Class<?> targetClass, @NonNull Schema schema) {
+    return thriftFieldDescriptors(targetClass).keySet().stream()
+        .map(FieldExtractor::new)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public @NonNull List<FieldValueTypeInformation> fieldValueTypeInformations(
+      @NonNull Class<?> targetClass, @NonNull Schema schema) {
+    return thriftFieldDescriptors(targetClass).values().stream()
+        .map(descriptor -> fieldValueTypeInfo(targetClass, 
descriptor.fieldName))
+        .collect(Collectors.toList());
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <FieldT extends Enum<FieldT> & TFieldIdEnum, T extends 
TBase<T, FieldT>>
+      Map<FieldT, FieldMetaData> thriftFieldDescriptors(Class<?> targetClass) {
+    return (Map<FieldT, FieldMetaData>) 
FieldMetaData.getStructMetaDataMap((Class<T>) targetClass);
+  }
+
+  private FieldValueTypeInformation fieldValueTypeInfo(Class<?> type, String 
fieldName) {
+    if (TUnion.class.isAssignableFrom(type)) {
+      final List<Method> factoryMethods =
+          Stream.of(type.getDeclaredMethods())
+              .filter(m -> m.getName().equals(fieldName))
+              .filter(m -> m.getModifiers() == (Modifier.PUBLIC | 
Modifier.STATIC))
+              .filter(m -> m.getParameterCount() == 1)
+              .filter(m -> m.getReturnType() == type)
+              .collect(Collectors.toList());
+      if (factoryMethods.isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "No suitable static factory method: %s.%s(...)", 
type.getName(), fieldName));
+      }
+      if (factoryMethods.size() > 1) {
+        throw new IllegalStateException("Overloaded factory methods: " + 
factoryMethods);

Review comment:
       This should never happen with generated code, so we don't need to bother 
keeping track /inferring of the actual parameter type in order to choose the 
proper factory method here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to