[ 
https://issues.apache.org/jira/browse/BEAM-7274?focusedWorklogId=392086&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-392086
 ]

ASF GitHub Bot logged work on BEAM-7274:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Feb/20 20:57
            Start Date: 24/Feb/20 20:57
    Worklog Time Spent: 10m 
      Work Description: alexvanboxel commented on pull request #10502: 
[BEAM-7274] Add DynamicMessage Schema support
URL: https://github.com/apache/beam/pull/10502#discussion_r383509654
 
 

 ##########
 File path: 
sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoDynamicMessageSchema.java
 ##########
 @@ -0,0 +1,852 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.protobuf;
+
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.getFieldNumber;
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.getMapKeyMessageName;
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.getMapValueMessageName;
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.getMessageName;
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.withFieldNumber;
+import static 
org.apache.beam.sdk.extensions.protobuf.ProtoSchemaTranslator.withMessageName;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+@Experimental(Experimental.Kind.SCHEMAS)
+public class ProtoDynamicMessageSchema<T> implements Serializable {
+  public static final long serialVersionUID = 1L;
+
+  /**
+   * Context of the schema, the context can be generated from a source schema 
or descriptors. The
+   * ability of converting back from Row to proto depends on the type of 
context.
+   */
+  private final Context context;
+
+  /** THe SchemaCoder holds the resolved schema and to/fromRow functions. */
+  private transient SchemaCoder schemaCoder;
+
+  /** List of field converters for each field in the row. */
+  private transient List<Convert> converters;
+
+  private ProtoDynamicMessageSchema(String messageName, ProtoDomain domain) {
+    this.context = new DescriptorContext(messageName, domain);
+    readResolve();
+  }
+
+  private ProtoDynamicMessageSchema(Context context) {
+    this.context = context;
+    readResolve();
+  }
+
+  /**
+   * Create a new ProtoDynamicMessageSchema from a {@link ProtoDomain} and for 
a message. The
+   * message need to be in the domain and needs to be the fully qualified name.
+   */
+  public static ProtoDynamicMessageSchema forDescriptor(ProtoDomain domain, 
String messageName) {
+    return new ProtoDynamicMessageSchema(messageName, domain);
+  }
+
+  /**
+   * Create a new ProtoDynamicMessageSchema from a {@link ProtoDomain} and for 
a descriptor. The
+   * descriptor is only used for it's name, that name will be used for a 
search in the domain.
+   */
+  public static ProtoDynamicMessageSchema<DynamicMessage> forDescriptor(
+      ProtoDomain domain, Descriptors.Descriptor descriptor) {
+    return new ProtoDynamicMessageSchema<>(descriptor.getFullName(), domain);
+  }
+
+  static ProtoDynamicMessageSchema<?> forContext(Context context, Schema.Field 
field) {
+    return new ProtoDynamicMessageSchema<>(context.getSubContext(field));
+  }
+
+  static ProtoDynamicMessageSchema<Message> forSchema(Schema schema) {
+    return new ProtoDynamicMessageSchema<>(new Context(schema, Message.class));
+  }
+
+  /** Initialize the transient fields after deserialization or construction. */
+  private Object readResolve() {
+    converters = createConverters(context.getSchema());
+    schemaCoder =
+        SchemaCoder.of(
+            context.getSchema(),
+            TypeDescriptor.of(context.getBaseClass()),
+            new MessageToRowFunction(),
+            new RowToMessageFunction());
+    return this;
+  }
+
+  Convert createConverter(Schema.Field field) {
+    Schema.FieldType fieldType = field.getType();
+    String messageName = getMessageName(fieldType);
+    if (messageName != null && messageName.length() > 0) {
+      Schema.Field valueField =
+          Schema.Field.of("value", withFieldNumber(Schema.FieldType.BOOLEAN, 
1));
+      switch (messageName) {
+        case "google.protobuf.StringValue":
+        case "google.protobuf.DoubleValue":
+        case "google.protobuf.FloatValue":
+        case "google.protobuf.BoolValue":
+        case "google.protobuf.Int64Value":
+        case "google.protobuf.Int32Value":
+        case "google.protobuf.UInt64Value":
+        case "google.protobuf.UInt32Value":
+          return new WrapperConvert(field, new PrimitiveConvert(valueField));
+        case "google.protobuf.BytesValue":
+          return new WrapperConvert(field, new BytesConvert(valueField));
+        case "google.protobuf.Timestamp":
+        case "google.protobuf.Duration":
+          // handled by logical type case
+          break;
+      }
+    }
+    switch (fieldType.getTypeName()) {
+      case BYTE:
+      case INT16:
+      case INT32:
+      case INT64:
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+      case BOOLEAN:
+        return new PrimitiveConvert(field);
+      case BYTES:
+        return new BytesConvert(field);
+      case ARRAY:
+      case ITERABLE:
+        return new ArrayConvert(this, field);
+      case MAP:
+        return new MapConvert(this, field);
+      case LOGICAL_TYPE:
+        String identifier = field.getType().getLogicalType().getIdentifier();
+        switch (identifier) {
+          case ProtoSchemaLogicalTypes.Fixed32.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.Fixed64.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.SFixed32.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.SFixed64.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.SInt32.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.SInt64.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.UInt32.IDENTIFIER:
+          case ProtoSchemaLogicalTypes.UInt64.IDENTIFIER:
+            return new LogicalTypeConvert(field, fieldType.getLogicalType());
+          case NanosInstant.IDENTIFIER:
+            return new TimestampConvert(field);
+          case NanosDuration.IDENTIFIER:
+            return new DurationConvert(field);
+          case EnumerationType.IDENTIFIER:
+            return new EnumConvert(field, fieldType.getLogicalType());
+          case OneOfType.IDENTIFIER:
+            return new OneOfConvert(this, field, fieldType.getLogicalType());
+          default:
+            throw new IllegalStateException("Unexpected logical type : " + 
identifier);
+        }
+      case ROW:
+        return new MessageConvert(this, field);
+      default:
+        throw new IllegalStateException("Unexpected value: " + fieldType);
+    }
+  }
+
+  private List<Convert> createConverters(Schema schema) {
+    List<Convert> fieldOverlays = new ArrayList<>();
+    for (Schema.Field field : schema.getFields()) {
+      fieldOverlays.add(createConverter(field));
+    }
+    return fieldOverlays;
+  }
+
+  public Schema getSchema() {
+    return this.schemaCoder.getSchema();
+  }
+
+  public SchemaCoder<T> getSchemaCoder() {
+    return schemaCoder;
+  }
+
+  public SerializableFunction<T, Row> getToRowFunction() {
+    return schemaCoder.getToRowFunction();
+  }
+
+  public SerializableFunction<Row, T> getFromRowFunction() {
+    return schemaCoder.getFromRowFunction();
+  }
+
+  /**
+   * Context that only has enough information to convert a proto message to a 
Row. This can be used
+   * for arbitrary conventions, like decoding messages in proto options.
+   */
+  static class Context<T> implements Serializable {
+    private final Schema schema;
+
+    /**
+     * Base class for the protobuf message. Normally this is DynamicMessage, 
but as this schema
+     * class is also used to decode protobuf options this can be normal 
Message instances.
+     */
+    private Class<T> baseClass;
+
+    Context(Schema schema, Class<T> baseClass) {
+      this.schema = schema;
+      this.baseClass = baseClass;
+    }
+
+    public Schema getSchema() {
+      return schema;
+    }
+
+    public Class<T> getBaseClass() {
+      return baseClass;
+    }
+
+    public DynamicMessage.Builder invokeNewBuilder() {
+      throw new IllegalStateException("Should not be calling 
invokeNewBuilder");
+    }
+
+    public Context getSubContext(Schema.Field field) {
+      return new Context(field.getType().getRowSchema(), Message.class);
+    }
+  }
+
+  /**
+   * Context the contains the full {@link ProtoDomain} and a reference to the 
message name. The full
+   * domain is needed for creating Rows back to the original proto messages.
+   */
+  static class DescriptorContext extends Context<DynamicMessage> {
+    private final String messageName;
+    private final ProtoDomain domain;
+    private transient Descriptors.Descriptor descriptor;
+
+    DescriptorContext(String messageName, ProtoDomain domain) {
+      super(
+          ProtoSchemaTranslator.getSchema(domain.getDescriptor(messageName)), 
DynamicMessage.class);
+      this.messageName = messageName;
+      this.domain = domain;
+    }
+
+    @Override
+    public DynamicMessage.Builder invokeNewBuilder() {
+      if (descriptor == null) {
+        descriptor = domain.getDescriptor(messageName);
+      }
+      return DynamicMessage.newBuilder(descriptor);
+    }
+
+    @Override
+    public Context getSubContext(Schema.Field field) {
+      String messageName = getMessageName(field.getType());
+      return new DescriptorContext(messageName, domain);
+    }
+  }
+
+  /**
+   * Base converter class for converting from proto values to row values. The 
converter mainly works
+   * on fields in proto messages but also has methods to convert individual 
elements (example, for
+   * elements in Lists or Maps).
+   */
+  abstract static class Convert<ValueT, InT> {
+    private int number;
+
+    Convert(Schema.Field field) {
+      try {
+        this.number = getFieldNumber(field.getType());
+      } catch (NumberFormatException e) {
+        this.number = -1;
+      }
+    }
+
+    FieldDescriptor getFieldDescriptor(Message message) {
+      return message.getDescriptorForType().findFieldByNumber(number);
+    }
+
+    FieldDescriptor getFieldDescriptor(Message.Builder message) {
+      return message.getDescriptorForType().findFieldByNumber(number);
+    }
+
+    /** Get a proto field and convert it into a row value. */
+    abstract Object getFromProtoMessage(Message message);
+
+    /** Convert a proto value into a row value. */
+    abstract ValueT convertFromProtoValue(Object object);
+
+    /** Convert a row value and set it on a proto message. */
+    abstract void setOnProtoMessage(Message.Builder object, InT value);
+
+    /** Convert a row value into a proto value. */
+    abstract Object convertToProtoValue(FieldDescriptor fieldDescriptor, 
Object value);
+  }
+
+  /** Converter for primitive proto values. */
+  static class PrimitiveConvert extends Convert<Object, Object> {
+    PrimitiveConvert(Schema.Field field) {
+      super(field);
+    }
+
+    @Override
+    Object getFromProtoMessage(Message message) {
+      FieldDescriptor fieldDescriptor = getFieldDescriptor(message);
+      return convertFromProtoValue(message.getField(fieldDescriptor));
+    }
+
+    @Override
+    Object convertFromProtoValue(Object object) {
+      return object;
+    }
+
+    @Override
+    void setOnProtoMessage(Message.Builder message, Object value) {
+      message.setField(getFieldDescriptor(message), value);
+    }
+
+    @Override
+    Object convertToProtoValue(FieldDescriptor fieldDescriptor, Object value) {
+      return value;
+    }
+  }
+
+  /**
+   * Converter for Bytes. Protobuf Bytes are natively represented as 
ByteStrings that requires
+   * special handling for byte[] of size 0.
+   */
+  static class BytesConvert extends PrimitiveConvert {
+    BytesConvert(Schema.Field field) {
+      super(field);
+    }
+
+    @Override
+    Object convertFromProtoValue(Object object) {
+      // return object;
+      return ((ByteString) object).toByteArray();
+    }
+
+    @Override
+    void setOnProtoMessage(Message.Builder message, Object value) {
+      if (value != null && ((byte[]) value).length > 0) {
+        // Protobuf messages BYTES doesn't like empty bytes?!
+        FieldDescriptor fieldDescriptor = getFieldDescriptor(message);
+        message.setField(fieldDescriptor, convertToProtoValue(fieldDescriptor, 
value));
+      }
+    }
+
+    @Override
+    Object convertToProtoValue(FieldDescriptor fieldDescriptor, Object value) {
+      if (value != null) {
+        return ByteString.copyFrom((byte[]) value);
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Specific converter for Proto Wrapper values as they are translated into 
nullable row values.
+   */
+  static class WrapperConvert extends Convert<Object, Object> {
+    private Convert valueConvert;
+
+    WrapperConvert(Schema.Field field, Convert valueConvert) {
+      super(field);
+      this.valueConvert = valueConvert;
+    }
+
+    @Override
+    Object getFromProtoMessage(Message message) {
+      if (message.hasField(getFieldDescriptor(message))) {
+        Message wrapper = (Message) 
message.getField(getFieldDescriptor(message));
+        return valueConvert.getFromProtoMessage(wrapper);
+      }
+      return null;
+    }
+
+    @Override
+    Object convertFromProtoValue(Object object) {
+      return object;
+    }
+
+    @Override
+    void setOnProtoMessage(Message.Builder message, Object value) {
+      if (value != null) {
+        DynamicMessage.Builder builder =
+            
DynamicMessage.newBuilder(getFieldDescriptor(message).getMessageType());
+        valueConvert.setOnProtoMessage(builder, value);
+        message.setField(getFieldDescriptor(message), builder.build());
+      }
+    }
+
+    @Override
+    Object convertToProtoValue(FieldDescriptor fieldDescriptor, Object value) {
+      return value;
+    }
+  }
+
+  static class TimestampConvert extends Convert<Object, Object> {
+
+    TimestampConvert(Schema.Field field) {
+      super(field);
+    }
+
+    @Override
+    Object getFromProtoMessage(Message message) {
+      FieldDescriptor fieldDescriptor = getFieldDescriptor(message);
+      if (message.hasField(fieldDescriptor)) {
+        Message wrapper = (Message) message.getField(fieldDescriptor);
+        return convertFromProtoValue(wrapper);
+      }
+      return null;
+    }
+
+    @Override
+    Object convertFromProtoValue(Object object) {
+      Message timestamp = (Message) object;
+      Descriptors.Descriptor timestampDescriptor = 
timestamp.getDescriptorForType();
+      FieldDescriptor secondField = timestampDescriptor.findFieldByNumber(1);
+      FieldDescriptor nanoField = timestampDescriptor.findFieldByNumber(2);
+      long second = (long) timestamp.getField(secondField);
+      int nano = (int) timestamp.getField(nanoField);
+      return Instant.ofEpochSecond(second, nano);
+    }
+
+    @Override
+    void setOnProtoMessage(Message.Builder message, Object value) {
+      if (value != null) {
+        FieldDescriptor fieldDescriptor = getFieldDescriptor(message);
+        message.setField(fieldDescriptor, convertToProtoValue(fieldDescriptor, 
value));
+      }
+    }
+
+    @Override
+    Object convertToProtoValue(FieldDescriptor fieldDescriptor, Object value) {
+      Row row = (Row) value;
+      return com.google.protobuf.Timestamp.newBuilder()
+          .setSeconds(row.getInt64(0))
+          .setNanos(row.getInt32(1))
+          .build();
+    }
+  }
+
+  static class DurationConvert extends Convert<Object, Object> {
+
+    DurationConvert(Schema.Field field) {
+      super(field);
+    }
+
+    @Override
+    Object getFromProtoMessage(Message message) {
+      FieldDescriptor fieldDescriptor = getFieldDescriptor(message);
+      if (message.hasField(fieldDescriptor)) {
+        Message wrapper = (Message) message.getField(fieldDescriptor);
+        return convertFromProtoValue(wrapper);
+      }
+      return null;
+    }
+
+    @Override
+    Duration convertFromProtoValue(Object object) {
+      Message timestamp = (Message) object;
+      Descriptors.Descriptor timestampDescriptor = 
timestamp.getDescriptorForType();
+      FieldDescriptor secondField = timestampDescriptor.findFieldByNumber(1);
+      FieldDescriptor nanoField = timestampDescriptor.findFieldByNumber(2);
+      long second = (long) timestamp.getField(secondField);
+      int nano = (int) timestamp.getField(nanoField);
+      return Duration.ofSeconds(second, nano);
+    }
+
+    @Override
+    void setOnProtoMessage(Message.Builder message, Object value) {
+      if (value != null) {
+        FieldDescriptor fieldDescriptor = getFieldDescriptor(message);
+        message.setField(fieldDescriptor, convertToProtoValue(fieldDescriptor, 
value));
+      }
+    }
+
+    @Override
+    Object convertToProtoValue(FieldDescriptor fieldDescriptor, Object value) {
+      Row row = (Row) value;
+      return com.google.protobuf.Duration.newBuilder()
+          .setSeconds(row.getInt64(0))
+          .setNanos(row.getInt32(1))
+          .build();
+    }
+  }
+
+  static class MessageConvert extends Convert<Object, Object> {
+    private final SerializableFunction fromRowFunction;
+    private final SerializableFunction toRowFunction;
+
+    MessageConvert(ProtoDynamicMessageSchema rootProtoSchema, Schema.Field 
field) {
+      super(field);
+      ProtoDynamicMessageSchema protoSchema =
+          ProtoDynamicMessageSchema.forContext(rootProtoSchema.context, field);
+      SchemaCoder<DynamicMessage> schemaCoder = protoSchema.getSchemaCoder();
 
 Review comment:
   I've changed the accesses here, but iI still think that it's ok to use 
SchemaCoder as a transient container for the trio: Schema, ToRow and FromRow.
   
   If it's a blocker, I can spit it up in 3 different transient fields.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 392086)
    Time Spent: 25h 50m  (was: 25h 40m)

> Protobuf Beam Schema support
> ----------------------------
>
>                 Key: BEAM-7274
>                 URL: https://issues.apache.org/jira/browse/BEAM-7274
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Alex Van Boxel
>            Assignee: Alex Van Boxel
>            Priority: Minor
>             Fix For: 2.20.0
>
>          Time Spent: 25h 50m
>  Remaining Estimate: 0h
>
> Add support for the new Beam Schema to the Protobuf extension.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to