reuvenlax commented on a change in pull request #16926:
URL: https://github.com/apache/beam/pull/16926#discussion_r814348203
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
##########
@@ -68,7 +73,17 @@ public long getEncodedElementByteSize(TableRow value) throws
Exception {
// FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
// TableRow.
private static final ObjectMapper MAPPER =
- new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+ JsonMapper.builder()
+ .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS)
+ .addModule(new JavaTimeModule())
+ // serialize Date/Time to string instead of floats
+ .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
+ // serialize BigDecimal to string without scientific notation
instead of floats
+ .configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true)
+ .withConfigOverride(
+ BigDecimal.class,
+ it ->
it.setFormat(JsonFormat.Value.forShape(JsonFormat.Shape.STRING)))
Review comment:
We cannot make backwards incompatible changes to this coder. Users often
update in-flight pipelines with new Beam versions, and if the new coder can't
parse that the new pipeline will get stuck. Why did you need to make this
change?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -196,102 +222,131 @@ private static Object messageValueFromFieldValue(
"Received null value for non-nullable field " +
fieldDescriptor.getName());
}
}
- return toProtoValue(fieldDescriptor, bqValue,
fieldDescriptor.isRepeated());
+ return toProtoValue(tableFieldSchema, fieldDescriptor, bqValue,
fieldDescriptor.isRepeated());
}
- private static final Map<FieldDescriptor.Type, Function<String, Object>>
- JSON_PROTO_STRING_PARSERS =
- ImmutableMap.<FieldDescriptor.Type, Function<String,
Object>>builder()
- .put(FieldDescriptor.Type.INT32, Integer::valueOf)
- .put(FieldDescriptor.Type.INT64, Long::valueOf)
- .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
- .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
- .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
- .put(FieldDescriptor.Type.STRING, str -> str)
- .put(
- FieldDescriptor.Type.BYTES,
- b64 ->
ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
- .build();
-
@Nullable
@SuppressWarnings({"nullness"})
@VisibleForTesting
static Object toProtoValue(
- FieldDescriptor fieldDescriptor, Object jsonBQValue, boolean isRepeated)
{
+ TableFieldSchema tableFieldSchema,
+ FieldDescriptor fieldDescriptor,
+ Object jsonBQValue,
+ boolean isRepeated) {
if (isRepeated) {
return ((List<Object>) jsonBQValue)
- .stream().map(v -> toProtoValue(fieldDescriptor, v,
false)).collect(toList());
+ .stream()
+ .map(v -> toProtoValue(tableFieldSchema, fieldDescriptor, v,
false))
+ .collect(toList());
}
if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
if (jsonBQValue instanceof TableRow) {
TableRow tableRow = (TableRow) jsonBQValue;
- return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow);
+ return messageFromTableRow(
+ tableFieldSchema.getFields(), fieldDescriptor.getMessageType(),
tableRow);
} else if (jsonBQValue instanceof AbstractMap) {
// This will handle nested rows.
AbstractMap<String, Object> map = ((AbstractMap<String, Object>)
jsonBQValue);
- return messageFromMap(fieldDescriptor.getMessageType(), map);
+ return messageFromMap(tableFieldSchema.getFields(),
fieldDescriptor.getMessageType(), map);
} else {
throw new RuntimeException("Unexpected value " + jsonBQValue + "
Expected a JSON map.");
}
}
- @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor,
jsonBQValue);
- if (scalarValue == null) {
- return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated);
- } else {
- return scalarValue;
- }
+ return scalarToProtoValue(tableFieldSchema, jsonBQValue);
}
@VisibleForTesting
@Nullable
- static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object
jsonBQValue) {
- if (jsonBQValue instanceof String) {
- Function<String, Object> mapper =
JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
- if (mapper == null) {
- throw new UnsupportedOperationException(
- "Converting BigQuery type '"
- + jsonBQValue.getClass()
- + "' to '"
- + fieldDescriptor
- + "' is not supported");
- }
- return mapper.apply((String) jsonBQValue);
+ static Object scalarToProtoValue(TableFieldSchema tableFieldSchema, Object
jsonBQValue) {
+ if (jsonBQValue == null) {
+ // nullable value
+ return null;
}
- switch (fieldDescriptor.getType()) {
- case BOOL:
- if (jsonBQValue instanceof Boolean) {
+ switch (tableFieldSchema.getType()) {
+ case "INT64":
+ case "INTEGER":
+ if (jsonBQValue instanceof String) {
+ return Long.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Integer) {
+ return ((Integer) jsonBQValue).longValue();
+ } else if (jsonBQValue instanceof Long) {
return jsonBQValue;
}
break;
- case BYTES:
+ case "FLOAT64":
+ case "FLOAT":
+ if (jsonBQValue instanceof String) {
+ return Double.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Double) {
+ return jsonBQValue;
+ } else if (jsonBQValue instanceof Float) {
+ return ((Float) jsonBQValue).longValue();
+ }
break;
- case INT64:
- if (jsonBQValue instanceof Integer) {
- return Long.valueOf((Integer) jsonBQValue);
- } else if (jsonBQValue instanceof Long) {
+ case "BOOLEAN":
+ case "BOOL":
+ if (jsonBQValue instanceof String) {
+ return Boolean.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Boolean) {
return jsonBQValue;
}
break;
- case INT32:
- if (jsonBQValue instanceof Integer) {
+ case "BYTES":
+ if (jsonBQValue instanceof String) {
+ return ByteString.copyFrom(BaseEncoding.base64().decode((String)
jsonBQValue));
+ } else if (jsonBQValue instanceof byte[]) {
+ return ByteString.copyFrom((byte[]) jsonBQValue);
+ } else if (jsonBQValue instanceof ByteString) {
return jsonBQValue;
}
break;
- case STRING:
+ case "TIMESTAMP":
+ if (jsonBQValue instanceof String) {
+ return ChronoUnit.MICROS.between(Instant.EPOCH,
Instant.parse((String) jsonBQValue));
+ } else if (jsonBQValue instanceof Instant) {
+ return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant)
jsonBQValue);
+ } else if (jsonBQValue instanceof Timestamp) {
+ return ChronoUnit.MICROS.between(Instant.EPOCH, ((Timestamp)
jsonBQValue).toInstant());
+ } else if (jsonBQValue instanceof Long) {
+ return jsonBQValue;
+ } else if (jsonBQValue instanceof Integer) {
+ return ((Integer) jsonBQValue).longValue();
+ }
Review comment:
Well if you're handling java time objects, you should also support JODA
time objects as they are heavily used inside of Beam.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -196,102 +222,131 @@ private static Object messageValueFromFieldValue(
"Received null value for non-nullable field " +
fieldDescriptor.getName());
}
}
- return toProtoValue(fieldDescriptor, bqValue,
fieldDescriptor.isRepeated());
+ return toProtoValue(tableFieldSchema, fieldDescriptor, bqValue,
fieldDescriptor.isRepeated());
}
- private static final Map<FieldDescriptor.Type, Function<String, Object>>
- JSON_PROTO_STRING_PARSERS =
- ImmutableMap.<FieldDescriptor.Type, Function<String,
Object>>builder()
- .put(FieldDescriptor.Type.INT32, Integer::valueOf)
- .put(FieldDescriptor.Type.INT64, Long::valueOf)
- .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
- .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
- .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
- .put(FieldDescriptor.Type.STRING, str -> str)
- .put(
- FieldDescriptor.Type.BYTES,
- b64 ->
ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
- .build();
-
@Nullable
@SuppressWarnings({"nullness"})
@VisibleForTesting
static Object toProtoValue(
- FieldDescriptor fieldDescriptor, Object jsonBQValue, boolean isRepeated)
{
+ TableFieldSchema tableFieldSchema,
+ FieldDescriptor fieldDescriptor,
+ Object jsonBQValue,
+ boolean isRepeated) {
if (isRepeated) {
return ((List<Object>) jsonBQValue)
- .stream().map(v -> toProtoValue(fieldDescriptor, v,
false)).collect(toList());
+ .stream()
+ .map(v -> toProtoValue(tableFieldSchema, fieldDescriptor, v,
false))
+ .collect(toList());
}
if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
if (jsonBQValue instanceof TableRow) {
TableRow tableRow = (TableRow) jsonBQValue;
- return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow);
+ return messageFromTableRow(
+ tableFieldSchema.getFields(), fieldDescriptor.getMessageType(),
tableRow);
} else if (jsonBQValue instanceof AbstractMap) {
// This will handle nested rows.
AbstractMap<String, Object> map = ((AbstractMap<String, Object>)
jsonBQValue);
- return messageFromMap(fieldDescriptor.getMessageType(), map);
+ return messageFromMap(tableFieldSchema.getFields(),
fieldDescriptor.getMessageType(), map);
} else {
throw new RuntimeException("Unexpected value " + jsonBQValue + "
Expected a JSON map.");
}
}
- @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor,
jsonBQValue);
- if (scalarValue == null) {
- return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated);
- } else {
- return scalarValue;
- }
+ return scalarToProtoValue(tableFieldSchema, jsonBQValue);
}
@VisibleForTesting
@Nullable
- static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object
jsonBQValue) {
- if (jsonBQValue instanceof String) {
- Function<String, Object> mapper =
JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
- if (mapper == null) {
- throw new UnsupportedOperationException(
- "Converting BigQuery type '"
- + jsonBQValue.getClass()
- + "' to '"
- + fieldDescriptor
- + "' is not supported");
- }
- return mapper.apply((String) jsonBQValue);
+ static Object scalarToProtoValue(TableFieldSchema tableFieldSchema, Object
jsonBQValue) {
+ if (jsonBQValue == null) {
+ // nullable value
+ return null;
}
- switch (fieldDescriptor.getType()) {
- case BOOL:
- if (jsonBQValue instanceof Boolean) {
+ switch (tableFieldSchema.getType()) {
+ case "INT64":
+ case "INTEGER":
+ if (jsonBQValue instanceof String) {
+ return Long.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Integer) {
+ return ((Integer) jsonBQValue).longValue();
+ } else if (jsonBQValue instanceof Long) {
return jsonBQValue;
}
break;
- case BYTES:
+ case "FLOAT64":
+ case "FLOAT":
+ if (jsonBQValue instanceof String) {
+ return Double.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Double) {
+ return jsonBQValue;
+ } else if (jsonBQValue instanceof Float) {
+ return ((Float) jsonBQValue).longValue();
+ }
break;
- case INT64:
- if (jsonBQValue instanceof Integer) {
- return Long.valueOf((Integer) jsonBQValue);
- } else if (jsonBQValue instanceof Long) {
+ case "BOOLEAN":
+ case "BOOL":
+ if (jsonBQValue instanceof String) {
+ return Boolean.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Boolean) {
return jsonBQValue;
}
break;
- case INT32:
- if (jsonBQValue instanceof Integer) {
+ case "BYTES":
+ if (jsonBQValue instanceof String) {
+ return ByteString.copyFrom(BaseEncoding.base64().decode((String)
jsonBQValue));
+ } else if (jsonBQValue instanceof byte[]) {
+ return ByteString.copyFrom((byte[]) jsonBQValue);
+ } else if (jsonBQValue instanceof ByteString) {
return jsonBQValue;
}
break;
- case STRING:
+ case "TIMESTAMP":
+ if (jsonBQValue instanceof String) {
+ return ChronoUnit.MICROS.between(Instant.EPOCH,
Instant.parse((String) jsonBQValue));
Review comment:
Does this parse ISO 8601 formatted strings?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -196,102 +222,131 @@ private static Object messageValueFromFieldValue(
"Received null value for non-nullable field " +
fieldDescriptor.getName());
}
}
- return toProtoValue(fieldDescriptor, bqValue,
fieldDescriptor.isRepeated());
+ return toProtoValue(tableFieldSchema, fieldDescriptor, bqValue,
fieldDescriptor.isRepeated());
}
- private static final Map<FieldDescriptor.Type, Function<String, Object>>
- JSON_PROTO_STRING_PARSERS =
- ImmutableMap.<FieldDescriptor.Type, Function<String,
Object>>builder()
- .put(FieldDescriptor.Type.INT32, Integer::valueOf)
- .put(FieldDescriptor.Type.INT64, Long::valueOf)
- .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
- .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
- .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
- .put(FieldDescriptor.Type.STRING, str -> str)
- .put(
- FieldDescriptor.Type.BYTES,
- b64 ->
ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
- .build();
-
@Nullable
@SuppressWarnings({"nullness"})
@VisibleForTesting
static Object toProtoValue(
- FieldDescriptor fieldDescriptor, Object jsonBQValue, boolean isRepeated)
{
+ TableFieldSchema tableFieldSchema,
+ FieldDescriptor fieldDescriptor,
+ Object jsonBQValue,
+ boolean isRepeated) {
if (isRepeated) {
return ((List<Object>) jsonBQValue)
- .stream().map(v -> toProtoValue(fieldDescriptor, v,
false)).collect(toList());
+ .stream()
+ .map(v -> toProtoValue(tableFieldSchema, fieldDescriptor, v,
false))
+ .collect(toList());
}
if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
if (jsonBQValue instanceof TableRow) {
TableRow tableRow = (TableRow) jsonBQValue;
- return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow);
+ return messageFromTableRow(
+ tableFieldSchema.getFields(), fieldDescriptor.getMessageType(),
tableRow);
} else if (jsonBQValue instanceof AbstractMap) {
// This will handle nested rows.
AbstractMap<String, Object> map = ((AbstractMap<String, Object>)
jsonBQValue);
- return messageFromMap(fieldDescriptor.getMessageType(), map);
+ return messageFromMap(tableFieldSchema.getFields(),
fieldDescriptor.getMessageType(), map);
} else {
throw new RuntimeException("Unexpected value " + jsonBQValue + "
Expected a JSON map.");
}
}
- @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor,
jsonBQValue);
- if (scalarValue == null) {
- return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated);
- } else {
- return scalarValue;
- }
+ return scalarToProtoValue(tableFieldSchema, jsonBQValue);
}
@VisibleForTesting
@Nullable
- static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object
jsonBQValue) {
- if (jsonBQValue instanceof String) {
- Function<String, Object> mapper =
JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
- if (mapper == null) {
- throw new UnsupportedOperationException(
- "Converting BigQuery type '"
- + jsonBQValue.getClass()
- + "' to '"
- + fieldDescriptor
- + "' is not supported");
- }
- return mapper.apply((String) jsonBQValue);
+ static Object scalarToProtoValue(TableFieldSchema tableFieldSchema, Object
jsonBQValue) {
+ if (jsonBQValue == null) {
+ // nullable value
+ return null;
}
- switch (fieldDescriptor.getType()) {
- case BOOL:
- if (jsonBQValue instanceof Boolean) {
+ switch (tableFieldSchema.getType()) {
+ case "INT64":
+ case "INTEGER":
+ if (jsonBQValue instanceof String) {
+ return Long.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Integer) {
+ return ((Integer) jsonBQValue).longValue();
+ } else if (jsonBQValue instanceof Long) {
return jsonBQValue;
}
break;
- case BYTES:
+ case "FLOAT64":
+ case "FLOAT":
+ if (jsonBQValue instanceof String) {
+ return Double.valueOf((String) jsonBQValue);
+ } else if (jsonBQValue instanceof Double) {
+ return jsonBQValue;
+ } else if (jsonBQValue instanceof Float) {
+ return ((Float) jsonBQValue).longValue();
Review comment:
Why longValue? Do you mean doubleValue()?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
##########
@@ -23,24 +23,18 @@
import com.google.api.services.bigquery.model.TableSchema;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Message;
-import java.time.Duration;
+import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
@SuppressWarnings({"nullness"})
public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
extends StorageApiDynamicDestinations<T, DestinationT> {
private final SerializableFunction<T, TableRow> formatFunction;
private final CreateDisposition createDisposition;
- // TODO: Is this cache needed? All callers of getMessageConverter are
already caching the resullt.
- private final Cache<DestinationT, Descriptor> destinationDescriptorCache =
-
CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(15)).build();
Review comment:
Let's revert this change for now, if it's unrelated to this bug. It adds
risk to this PR.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -88,17 +95,33 @@ public static Descriptor
getDescriptorFromTableSchema(TableSchema jsonSchema)
return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
}
+ private static TableFieldSchema getByName(
+ List<TableFieldSchema> tableFieldSchemaList, String name) {
+ for (TableFieldSchema tableFieldSchema : tableFieldSchemaList) {
+ if (tableFieldSchema.getName().equals(name)) {
+ return tableFieldSchema;
+ }
+ }
+ throw new RuntimeException("cannot find table schema for " + name);
+ }
+
public static DynamicMessage messageFromMap(
- Descriptor descriptor, AbstractMap<String, Object> map) {
+ List<TableFieldSchema> tableFieldSchemaList,
+ Descriptor descriptor,
+ AbstractMap<String, Object> map) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
for (Map.Entry<String, Object> entry : map.entrySet()) {
+
@Nullable
FieldDescriptor fieldDescriptor =
descriptor.findFieldByName(entry.getKey().toLowerCase());
if (fieldDescriptor == null) {
throw new RuntimeException(
"TableRow contained unexpected field with name " + entry.getKey());
}
- @Nullable Object value = messageValueFromFieldValue(fieldDescriptor,
entry.getValue());
+ TableFieldSchema tableFieldSchema = getByName(tableFieldSchemaList,
entry.getKey());
Review comment:
This makes message conversion quadratic in the number of fields, and
there are tables out there with a lot of fields!
##########
File path:
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
Review comment:
This is great! Make sure to run this test on the the Dataflow runner as
well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]