prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1040387046


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Avro {@link GenericRecord} objects to 
dynamic protocol message,
+ * for use with the Storage write API.
+ */
+public class AvroGenericRecordToStorageApiProto {
+
+  static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
+      ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
+          .put(Schema.Type.INT, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FIXED, TableFieldSchema.Type.BYTES)
+          .put(Schema.Type.LONG, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FLOAT, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.DOUBLE, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.STRING, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BOOLEAN, TableFieldSchema.Type.BOOL)
+          .put(Schema.Type.ENUM, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BYTES, TableFieldSchema.Type.BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
+      ImmutableMap.<String, TableFieldSchema.Type>builder()
+          .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
+          .put(LogicalTypes.decimal(1).getName(), 
TableFieldSchema.Type.BIGNUMERIC)
+          .put(LogicalTypes.timestampMicros().getName(), 
TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.timestampMillis().getName(), 
TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
+          .build();
+
+  static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
+          .put(Schema.Type.INT, o -> Long.valueOf((int) o))
+          .put(Schema.Type.FIXED, o -> 
ByteString.copyFrom(((GenericData.Fixed) o).bytes()))
+          .put(Schema.Type.LONG, Functions.identity())
+          .put(
+              Schema.Type.FLOAT,
+              o -> Double.valueOf(Float.valueOf((float) 
o).toString()).doubleValue())
+          .put(Schema.Type.DOUBLE, Function.identity())
+          .put(Schema.Type.STRING, Function.identity())
+          .put(Schema.Type.BOOLEAN, Function.identity())
+          .put(Schema.Type.ENUM, o -> o.toString())
+          .put(Schema.Type.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType, Object, Object>> 
LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
+          .put(LogicalTypes.date().getName(), (logicalType, value) -> 
convertDate(value))
+          .put(
+              LogicalTypes.decimal(1).getName(), 
AvroGenericRecordToStorageApiProto::convertDecimal)
+          .put(
+              LogicalTypes.timestampMicros().getName(),
+              (logicalType, value) -> convertTimestamp(value, true))
+          .put(
+              LogicalTypes.timestampMillis().getName(),
+              (logicalType, value) -> convertTimestamp(value, false))
+          .put(LogicalTypes.uuid().getName(), (logicalType, value) -> 
convertUUID(value))
+          .build();
+
+  static String convertUUID(Object value) {
+    if (value instanceof UUID) {
+      return ((UUID) value).toString();
+    } else {
+      Preconditions.checkArgument(value instanceof String, "Expecting a value 
as String type.");
+      UUID.fromString((String) value);
+      return (String) value;
+    }
+  }
+
+  static Long convertTimestamp(Object value, boolean micros) {
+    if (value instanceof ReadableInstant) {
+      return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Long, "Expecting a value as Long type (millis).");
+      return (Long) value;
+    }
+  }
+
+  static Integer convertDate(Object value) {
+    if (value instanceof ReadableInstant) {
+      return Days.daysBetween(Instant.EPOCH, (ReadableInstant) 
value).getDays();
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Integer, "Expecting a value as Integer type 
(days).");
+      return (Integer) value;
+    }
+  }
+
+  static ByteString convertDecimal(LogicalType logicalType, Object value) {
+    ByteBuffer byteBuffer = (ByteBuffer) value;
+    BigDecimal bigDecimal =
+        new Conversions.DecimalConversion()
+            .fromBytes(
+                byteBuffer.duplicate(),
+                Schema.create(Schema.Type.NULL), // dummy schema, not used
+                logicalType);
+    return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
+  }
+
+  /**
+   * Given an Avro Schema, returns a protocol-buffer TableSchema that can be 
used to write data
+   * through BigQuery Storage API.
+   *
+   * @param schema An Avro Schema
+   * @return Returns the TableSchema created from the provided Schema
+   */
+  public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) {
+    Preconditions.checkState(!schema.getFields().isEmpty());
+
+    TableSchema.Builder builder = TableSchema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      builder.addFields(fieldDescriptorFromAvroField(field));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Given an Avro {@link GenericRecord} object, returns a protocol-buffer 
message that can be used
+   * to write data using the BigQuery Storage streaming API.
+   *
+   * @param descriptor The Descriptor for the DynamicMessage result
+   * @param record An Avro GenericRecord
+   * @return A dynamic message representation of a Proto payload to be used 
for StorageWrite API
+   */
+  public static DynamicMessage messageFromGenericRecord(
+      Descriptor descriptor, GenericRecord record) {
+    Schema schema = record.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (Schema.Field field : schema.getFields()) {
+      FieldDescriptor fieldDescriptor =
+          
Preconditions.checkNotNull(descriptor.findFieldByName(field.name().toLowerCase()));
+      @Nullable
+      Object value =
+          messageValueFromGenericRecordValue(fieldDescriptor, field, 
field.name(), record);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field 
field) {
+    @Nullable Schema schema = field.schema();
+    TableFieldSchema.Builder builder =
+        TableFieldSchema.newBuilder().setName(field.name().toLowerCase());
+    Schema elementType = null;
+    switch (schema.getType()) {
+      case RECORD:
+        if (schema == null) {

Review Comment:
   Good catch, moved to before the switch statement.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Avro {@link GenericRecord} objects to 
dynamic protocol message,
+ * for use with the Storage write API.
+ */
+public class AvroGenericRecordToStorageApiProto {
+
+  static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
+      ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
+          .put(Schema.Type.INT, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FIXED, TableFieldSchema.Type.BYTES)
+          .put(Schema.Type.LONG, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FLOAT, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.DOUBLE, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.STRING, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BOOLEAN, TableFieldSchema.Type.BOOL)
+          .put(Schema.Type.ENUM, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BYTES, TableFieldSchema.Type.BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
+      ImmutableMap.<String, TableFieldSchema.Type>builder()
+          .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
+          .put(LogicalTypes.decimal(1).getName(), 
TableFieldSchema.Type.BIGNUMERIC)
+          .put(LogicalTypes.timestampMicros().getName(), 
TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.timestampMillis().getName(), 
TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
+          .build();
+
+  static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
+          .put(Schema.Type.INT, o -> Long.valueOf((int) o))
+          .put(Schema.Type.FIXED, o -> 
ByteString.copyFrom(((GenericData.Fixed) o).bytes()))
+          .put(Schema.Type.LONG, Functions.identity())
+          .put(
+              Schema.Type.FLOAT,
+              o -> Double.valueOf(Float.valueOf((float) 
o).toString()).doubleValue())
+          .put(Schema.Type.DOUBLE, Function.identity())
+          .put(Schema.Type.STRING, Function.identity())
+          .put(Schema.Type.BOOLEAN, Function.identity())
+          .put(Schema.Type.ENUM, o -> o.toString())
+          .put(Schema.Type.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType, Object, Object>> 
LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
+          .put(LogicalTypes.date().getName(), (logicalType, value) -> 
convertDate(value))
+          .put(
+              LogicalTypes.decimal(1).getName(), 
AvroGenericRecordToStorageApiProto::convertDecimal)
+          .put(
+              LogicalTypes.timestampMicros().getName(),
+              (logicalType, value) -> convertTimestamp(value, true))
+          .put(
+              LogicalTypes.timestampMillis().getName(),
+              (logicalType, value) -> convertTimestamp(value, false))
+          .put(LogicalTypes.uuid().getName(), (logicalType, value) -> 
convertUUID(value))
+          .build();
+
+  static String convertUUID(Object value) {
+    if (value instanceof UUID) {
+      return ((UUID) value).toString();
+    } else {
+      Preconditions.checkArgument(value instanceof String, "Expecting a value 
as String type.");
+      UUID.fromString((String) value);
+      return (String) value;
+    }
+  }
+
+  static Long convertTimestamp(Object value, boolean micros) {
+    if (value instanceof ReadableInstant) {
+      return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Long, "Expecting a value as Long type (millis).");
+      return (Long) value;
+    }
+  }
+
+  static Integer convertDate(Object value) {
+    if (value instanceof ReadableInstant) {
+      return Days.daysBetween(Instant.EPOCH, (ReadableInstant) 
value).getDays();
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Integer, "Expecting a value as Integer type 
(days).");
+      return (Integer) value;
+    }
+  }
+
+  static ByteString convertDecimal(LogicalType logicalType, Object value) {
+    ByteBuffer byteBuffer = (ByteBuffer) value;
+    BigDecimal bigDecimal =
+        new Conversions.DecimalConversion()
+            .fromBytes(
+                byteBuffer.duplicate(),
+                Schema.create(Schema.Type.NULL), // dummy schema, not used
+                logicalType);
+    return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
+  }
+
+  /**
+   * Given an Avro Schema, returns a protocol-buffer TableSchema that can be 
used to write data
+   * through BigQuery Storage API.
+   *
+   * @param schema An Avro Schema
+   * @return Returns the TableSchema created from the provided Schema
+   */
+  public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) {
+    Preconditions.checkState(!schema.getFields().isEmpty());
+
+    TableSchema.Builder builder = TableSchema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      builder.addFields(fieldDescriptorFromAvroField(field));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Given an Avro {@link GenericRecord} object, returns a protocol-buffer 
message that can be used
+   * to write data using the BigQuery Storage streaming API.
+   *
+   * @param descriptor The Descriptor for the DynamicMessage result
+   * @param record An Avro GenericRecord
+   * @return A dynamic message representation of a Proto payload to be used 
for StorageWrite API
+   */
+  public static DynamicMessage messageFromGenericRecord(
+      Descriptor descriptor, GenericRecord record) {
+    Schema schema = record.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (Schema.Field field : schema.getFields()) {
+      FieldDescriptor fieldDescriptor =
+          
Preconditions.checkNotNull(descriptor.findFieldByName(field.name().toLowerCase()));
+      @Nullable
+      Object value =
+          messageValueFromGenericRecordValue(fieldDescriptor, field, 
field.name(), record);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field 
field) {
+    @Nullable Schema schema = field.schema();
+    TableFieldSchema.Builder builder =
+        TableFieldSchema.newBuilder().setName(field.name().toLowerCase());
+    Schema elementType = null;
+    switch (schema.getType()) {
+      case RECORD:
+        if (schema == null) {
+          throw new RuntimeException("Unexpected null schema!");
+        }
+        Preconditions.checkState(!schema.getFields().isEmpty());

Review Comment:
   They are allowed, the check was also in the `BeamRowToStorageApiProto` 
similar methods (Beam Row.Schema to Descriptors) and wanted to put the same 
safeguards. 
   
   Not sure how useful would be to have an empty record in this context, but 
let me know if you want me to remove the restriction. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to