amaliujia commented on a change in pull request #13771:
URL: https://github.com/apache/beam/pull/13771#discussion_r580779316



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -405,6 +430,185 @@ public static Schema fromTableSchema(TableSchema 
tableSchema, SchemaConversionOp
     return fromTableFieldSchema(tableSchema.getFields(), options);
   }
 
+  public static DescriptorProto descriptorSchemaFromTableSchema(TableSchema 
tableSchema) {
+    return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
+  }
+
+  public static DescriptorProto descriptorSchemaFromTableFieldSchemas(
+      Iterable<TableFieldSchema> tableFieldSchemas) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", 
"_"));
+    int i = 1;
+    for (TableFieldSchema fieldSchema : tableFieldSchemas) {
+      fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder);
+    }
+    return descriptorBuilder.build();
+  }
+
+  public static void fieldDescriptorFromTableField(
+      TableFieldSchema fieldSchema, int fieldNumber, DescriptorProto.Builder 
descriptorBuilder) {
+    FieldDescriptorProto.Builder fieldDescriptorBuilder = 
FieldDescriptorProto.newBuilder();
+    fieldDescriptorBuilder = 
fieldDescriptorBuilder.setName(fieldSchema.getName());
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+    switch (fieldSchema.getType()) {
+      case "STRING":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_STRING);
+        break;
+      case "BYTES":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_BYTES);
+        break;
+      case "INT64":
+      case "INTEGER":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        break;
+      case "FLOAT64":
+      case "FLOAT":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_FLOAT);
+        break;
+      case "BOOL":
+      case "BOOLEAN":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_BOOL);
+        break;
+      case "TIMESTAMP":
+      case "TIME":
+      case "DATETIME":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        break;
+      case "DATE":
+        fieldDescriptorBuilder = 
fieldDescriptorBuilder.setType(Type.TYPE_INT32);
+        break;
+      case "STRUCT":
+      case "RECORD":
+        DescriptorProto nested = 
descriptorSchemaFromTableFieldSchemas(fieldSchema.getFields());
+        descriptorBuilder.addNestedType(nested);
+        fieldDescriptorBuilder =
+            
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Converting BigQuery type " + fieldSchema.getType() + " to Beam 
type is unsupported");
+    }
+
+    Optional<Mode> fieldMode = 
Optional.ofNullable(fieldSchema.getMode()).map(Mode::valueOf);
+    if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
+      fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
+    } else if (!fieldMode.isPresent() || fieldMode.filter(m -> m == 
Mode.NULLABLE).isPresent()) {
+      fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
+    } else {
+      fieldDescriptorBuilder = 
fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+    }
+    descriptorBuilder.addField(fieldDescriptorBuilder.build());
+  }
+
+  public static DynamicMessage messageFromTableRow(Descriptor descriptor, 
TableRow tableRow) {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
+      Object value =
+          messageValueFromFieldValue(fieldDescriptor, 
tableRow.get(fieldDescriptor.getName()));
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  public static Object messageValueFromFieldValue(FieldDescriptor 
fieldDescriptor, Object bqValue) {
+    if (bqValue == null) {
+      if (fieldDescriptor.isOptional()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field " + 
fieldDescriptor.getName());
+      }
+    }
+    return toProtoValue(fieldDescriptor, bqValue);
+  }
+
+  private static final Map<FieldDescriptor.Type, Function<String, Object>> 
JSON_PROTO_PARSERS =
+      ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
+          .put(FieldDescriptor.Type.INT32, Integer::valueOf)
+          .put(FieldDescriptor.Type.INT64, Long::valueOf)
+          .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
+          .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
+          .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
+          .put(FieldDescriptor.Type.STRING, str -> str)
+          .put(
+              FieldDescriptor.Type.BYTES,
+              b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
+          .build();
+
+  private static Object toProtoValue(FieldDescriptor fieldDescriptor, Object 
jsonBQValue) {
+    if (jsonBQValue instanceof String) {
+      Function<String, Object> mapper = 
JSON_PROTO_PARSERS.get(fieldDescriptor.getType());
+      if (mapper != null) {
+        return mapper.apply((String) jsonBQValue);
+      }
+    } else if (jsonBQValue instanceof Integer) {
+      switch (fieldDescriptor.getJavaType()) {
+        case INT:
+          return Integer.valueOf((int) jsonBQValue);
+        case LONG:
+          return Long.valueOf((int) jsonBQValue);
+        default:
+          throw new RuntimeException("foo");

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to