liu-du commented on a change in pull request #16926:
URL: https://github.com/apache/beam/pull/16926#discussion_r815374329
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -196,102 +222,131 @@ private static Object messageValueFromFieldValue(
"Received null value for non-nullable field " +
fieldDescriptor.getName());
}
}
- return toProtoValue(fieldDescriptor, bqValue,
fieldDescriptor.isRepeated());
+ return toProtoValue(tableFieldSchema, fieldDescriptor, bqValue,
fieldDescriptor.isRepeated());
}
- private static final Map<FieldDescriptor.Type, Function<String, Object>>
- JSON_PROTO_STRING_PARSERS =
- ImmutableMap.<FieldDescriptor.Type, Function<String,
Object>>builder()
- .put(FieldDescriptor.Type.INT32, Integer::valueOf)
- .put(FieldDescriptor.Type.INT64, Long::valueOf)
- .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
- .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
- .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
- .put(FieldDescriptor.Type.STRING, str -> str)
- .put(
- FieldDescriptor.Type.BYTES,
- b64 ->
ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
- .build();
-
@Nullable
@SuppressWarnings({"nullness"})
@VisibleForTesting
static Object toProtoValue(
- FieldDescriptor fieldDescriptor, Object jsonBQValue, boolean isRepeated)
{
+ TableFieldSchema tableFieldSchema,
+ FieldDescriptor fieldDescriptor,
+ Object jsonBQValue,
+ boolean isRepeated) {
if (isRepeated) {
return ((List<Object>) jsonBQValue)
- .stream().map(v -> toProtoValue(fieldDescriptor, v,
false)).collect(toList());
+ .stream()
+ .map(v -> toProtoValue(tableFieldSchema, fieldDescriptor, v,
false))
+ .collect(toList());
}
if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
if (jsonBQValue instanceof TableRow) {
TableRow tableRow = (TableRow) jsonBQValue;
- return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow);
+ return messageFromTableRow(
+ tableFieldSchema.getFields(), fieldDescriptor.getMessageType(),
tableRow);
} else if (jsonBQValue instanceof AbstractMap) {
// This will handle nested rows.
AbstractMap<String, Object> map = ((AbstractMap<String, Object>)
jsonBQValue);
- return messageFromMap(fieldDescriptor.getMessageType(), map);
+ return messageFromMap(tableFieldSchema.getFields(),
fieldDescriptor.getMessageType(), map);
} else {
throw new RuntimeException("Unexpected value " + jsonBQValue + "
Expected a JSON map.");
}
}
- @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor,
jsonBQValue);
- if (scalarValue == null) {
- return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated);
- } else {
- return scalarValue;
- }
+ return scalarToProtoValue(tableFieldSchema, jsonBQValue);
}
@VisibleForTesting
@Nullable
- static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object
jsonBQValue) {
- if (jsonBQValue instanceof String) {
- Function<String, Object> mapper =
JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
- if (mapper == null) {
- throw new UnsupportedOperationException(
- "Converting BigQuery type '"
- + jsonBQValue.getClass()
- + "' to '"
- + fieldDescriptor
- + "' is not supported");
- }
- return mapper.apply((String) jsonBQValue);
+ static Object scalarToProtoValue(TableFieldSchema tableFieldSchema, Object
jsonBQValue) {
+ if (jsonBQValue == null) {
+ // nullable value
+ return null;
}
- switch (fieldDescriptor.getType()) {
- case BOOL:
- if (jsonBQValue instanceof Boolean) {
+ switch (tableFieldSchema.getType()) {
+ case "INT64":
+ case "INTEGER":
+ if (jsonBQValue instanceof String) {
+ return Long.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Integer) {
+ return ((Integer) jsonBQValue).longValue();
+ } else if (jsonBQValue instanceof Long) {
return jsonBQValue;
}
break;
- case BYTES:
+ case "FLOAT64":
+ case "FLOAT":
+ if (jsonBQValue instanceof String) {
+ return Double.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Double) {
+ return jsonBQValue;
+ } else if (jsonBQValue instanceof Float) {
+ return ((Float) jsonBQValue).longValue();
+ }
break;
- case INT64:
- if (jsonBQValue instanceof Integer) {
- return Long.valueOf((Integer) jsonBQValue);
- } else if (jsonBQValue instanceof Long) {
+ case "BOOLEAN":
+ case "BOOL":
+ if (jsonBQValue instanceof String) {
+ return Boolean.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Boolean) {
return jsonBQValue;
}
break;
- case INT32:
- if (jsonBQValue instanceof Integer) {
+ case "BYTES":
+ if (jsonBQValue instanceof String) {
+ return ByteString.copyFrom(BaseEncoding.base64().decode((String)
jsonBQValue));
+ } else if (jsonBQValue instanceof byte[]) {
+ return ByteString.copyFrom((byte[]) jsonBQValue);
+ } else if (jsonBQValue instanceof ByteString) {
return jsonBQValue;
}
break;
- case STRING:
+ case "TIMESTAMP":
+ if (jsonBQValue instanceof String) {
+ return ChronoUnit.MICROS.between(Instant.EPOCH,
Instant.parse((String) jsonBQValue));
Review comment:
Yes, Instant.parse uses ISO 8601 instant format
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]