[
https://issues.apache.org/jira/browse/BEAM-13990?focusedWorklogId=733575&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-733575
]
ASF GitHub Bot logged work on BEAM-13990:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 27/Feb/22 01:19
Start Date: 27/Feb/22 01:19
Worklog Time Spent: 10m
Work Description: liu-du commented on a change in pull request #16926:
URL: https://github.com/apache/beam/pull/16926#discussion_r815374329
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -196,102 +222,131 @@ private static Object messageValueFromFieldValue(
"Received null value for non-nullable field " +
fieldDescriptor.getName());
}
}
- return toProtoValue(fieldDescriptor, bqValue,
fieldDescriptor.isRepeated());
+ return toProtoValue(tableFieldSchema, fieldDescriptor, bqValue,
fieldDescriptor.isRepeated());
}
- private static final Map<FieldDescriptor.Type, Function<String, Object>>
- JSON_PROTO_STRING_PARSERS =
- ImmutableMap.<FieldDescriptor.Type, Function<String,
Object>>builder()
- .put(FieldDescriptor.Type.INT32, Integer::valueOf)
- .put(FieldDescriptor.Type.INT64, Long::valueOf)
- .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
- .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
- .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
- .put(FieldDescriptor.Type.STRING, str -> str)
- .put(
- FieldDescriptor.Type.BYTES,
- b64 ->
ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
- .build();
-
@Nullable
@SuppressWarnings({"nullness"})
@VisibleForTesting
static Object toProtoValue(
- FieldDescriptor fieldDescriptor, Object jsonBQValue, boolean isRepeated)
{
+ TableFieldSchema tableFieldSchema,
+ FieldDescriptor fieldDescriptor,
+ Object jsonBQValue,
+ boolean isRepeated) {
if (isRepeated) {
return ((List<Object>) jsonBQValue)
- .stream().map(v -> toProtoValue(fieldDescriptor, v,
false)).collect(toList());
+ .stream()
+ .map(v -> toProtoValue(tableFieldSchema, fieldDescriptor, v,
false))
+ .collect(toList());
}
if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
if (jsonBQValue instanceof TableRow) {
TableRow tableRow = (TableRow) jsonBQValue;
- return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow);
+ return messageFromTableRow(
+ tableFieldSchema.getFields(), fieldDescriptor.getMessageType(),
tableRow);
} else if (jsonBQValue instanceof AbstractMap) {
// This will handle nested rows.
AbstractMap<String, Object> map = ((AbstractMap<String, Object>)
jsonBQValue);
- return messageFromMap(fieldDescriptor.getMessageType(), map);
+ return messageFromMap(tableFieldSchema.getFields(),
fieldDescriptor.getMessageType(), map);
} else {
throw new RuntimeException("Unexpected value " + jsonBQValue + "
Expected a JSON map.");
}
}
- @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor,
jsonBQValue);
- if (scalarValue == null) {
- return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated);
- } else {
- return scalarValue;
- }
+ return scalarToProtoValue(tableFieldSchema, jsonBQValue);
}
@VisibleForTesting
@Nullable
- static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object
jsonBQValue) {
- if (jsonBQValue instanceof String) {
- Function<String, Object> mapper =
JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
- if (mapper == null) {
- throw new UnsupportedOperationException(
- "Converting BigQuery type '"
- + jsonBQValue.getClass()
- + "' to '"
- + fieldDescriptor
- + "' is not supported");
- }
- return mapper.apply((String) jsonBQValue);
+ static Object scalarToProtoValue(TableFieldSchema tableFieldSchema, Object
jsonBQValue) {
+ if (jsonBQValue == null) {
+ // nullable value
+ return null;
}
- switch (fieldDescriptor.getType()) {
- case BOOL:
- if (jsonBQValue instanceof Boolean) {
+ switch (tableFieldSchema.getType()) {
+ case "INT64":
+ case "INTEGER":
+ if (jsonBQValue instanceof String) {
+ return Long.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Integer) {
+ return ((Integer) jsonBQValue).longValue();
+ } else if (jsonBQValue instanceof Long) {
return jsonBQValue;
}
break;
- case BYTES:
+ case "FLOAT64":
+ case "FLOAT":
+ if (jsonBQValue instanceof String) {
+ return Double.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Double) {
+ return jsonBQValue;
+ } else if (jsonBQValue instanceof Float) {
+ return ((Float) jsonBQValue).longValue();
+ }
break;
- case INT64:
- if (jsonBQValue instanceof Integer) {
- return Long.valueOf((Integer) jsonBQValue);
- } else if (jsonBQValue instanceof Long) {
+ case "BOOLEAN":
+ case "BOOL":
+ if (jsonBQValue instanceof String) {
+ return Boolean.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Boolean) {
return jsonBQValue;
}
break;
- case INT32:
- if (jsonBQValue instanceof Integer) {
+ case "BYTES":
+ if (jsonBQValue instanceof String) {
+ return ByteString.copyFrom(BaseEncoding.base64().decode((String)
jsonBQValue));
+ } else if (jsonBQValue instanceof byte[]) {
+ return ByteString.copyFrom((byte[]) jsonBQValue);
+ } else if (jsonBQValue instanceof ByteString) {
return jsonBQValue;
}
break;
- case STRING:
+ case "TIMESTAMP":
+ if (jsonBQValue instanceof String) {
+ return ChronoUnit.MICROS.between(Instant.EPOCH,
Instant.parse((String) jsonBQValue));
Review comment:
Yes, Instant.parse uses ISO 8601 instant format
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 733575)
Remaining Estimate: 112h 20m (was: 112.5h)
Time Spent: 7h 40m (was: 7.5h)
> BigQueryIO cannot write to DATE and TIMESTAMP columns when using Storage
> Write API
> -----------------------------------------------------------------------------------
>
> Key: BEAM-13990
> URL: https://issues.apache.org/jira/browse/BEAM-13990
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Affects Versions: 2.36.0
> Reporter: Du Liu
> Assignee: Du Liu
> Priority: P2
> Original Estimate: 120h
> Time Spent: 7h 40m
> Remaining Estimate: 112h 20m
>
> when using Storage Write API with BigQueryIO, DATE and TIMESTAMP values are
> currently converted to String type in protobuf message. This is incorrect,
> according to storage write api [documentation|#data_type_conversions],] DATE
> should be converted to int32 and TIMESTAMP should be converted to int64.
> Here's error message:
> INFO: Stream finished with error
> com.google.api.gax.rpc.InvalidArgumentException:
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched
> with BigQuery field at D6cbe536b_4dab_4292_8fda_ff2932dded49.datevalue, the
> proto field type string, BigQuery field type DATE Entity
> I have included an integration test here:
> [https://github.com/liu-du/beam/commit/b56823d1d213adf6ca5564ce1d244cc4ae8f0816]
>
> The problem is because DATE and TIMESTAMP are converted to String in protobuf
> message here:
> [https://github.com/apache/beam/blob/a78fec72d0d9198eef75144a7bdaf93ada5abf9b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java#L69]
>
> Storage Write API reject the request because it's expecting int32/int64
> values.
>
> I've opened a PR here: https://github.com/apache/beam/pull/16926
--
This message was sent by Atlassian Jira
(v8.20.1#820001)