TheNeuralBit commented on a change in pull request #16875:
URL: https://github.com/apache/beam/pull/16875#discussion_r813392086
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
##########
@@ -579,6 +579,16 @@ public boolean isSupertypeOf(TypeName other) {
/** Convert the Java type used by the base {@link FieldType} to the input
type. */
@NonNull
InputT toInputType(@NonNull BaseT base);
+
+ /** Convert the Java type used by the base {@link FieldType} to the input
type. */
+ default BaseT toBaseTypeGeneric(Object input) {
+ return toBaseType((InputT) input);
+ }
+
+ /** Convert the Java type used by the base {@link FieldType} to the input
type. */
+ default InputT toInputTypeGeneric(Object base) {
+ return toInputType((BaseT) base);
+ }
Review comment:
Is there a way we could do this without adding these helpers? I'd rather
keep the type erasure confined in `SchemaTranslation`
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaTranslation.java
##########
@@ -411,26 +426,76 @@ public static Object rowFromProto(SchemaApi.Row row,
FieldType fieldType) {
.build();
case ROW:
return builder.setRowValue(rowToProto((Row) value)).build();
+ case DATETIME:
+ return builder
+ .setLogicalTypeValue(logicalTypeToProtoViaCoder(FieldType.INT64,
fieldType, value))
+ .build();
+ case DECIMAL:
+ return builder
+ .setLogicalTypeValue(logicalTypeToProtoViaCoder(FieldType.BYTES,
fieldType, value))
+ .build();
case LOGICAL_TYPE:
+ return builder
+
.setLogicalTypeValue(logicalTypeToProto(fieldType.getLogicalType(), value))
+ .build();
default:
return builder.setAtomicValue(primitiveRowFieldToProto(fieldType,
value)).build();
}
}
+ /** Returns if the given field is null and throws exception if it is and
can't be. */
+ static boolean isNullFieldValueFromProto(FieldType fieldType, boolean
hasNonNullValue) {
+ if (!hasNonNullValue && !fieldType.getNullable()) {
+ throw new RuntimeException("FieldTypeValue has no value but the field
cannot be null.");
+ }
+ return !hasNonNullValue;
+ }
+
static Object fieldValueFromProto(FieldType fieldType, SchemaApi.FieldValue
value) {
switch (fieldType.getTypeName()) {
case ARRAY:
+ if (isNullFieldValueFromProto(fieldType, value.hasArrayValue())) {
+ return null;
+ }
return arrayValueFromProto(fieldType.getCollectionElementType(),
value.getArrayValue());
case ITERABLE:
+ if (isNullFieldValueFromProto(fieldType, value.hasIterableValue())) {
+ return null;
+ }
return iterableValueFromProto(
fieldType.getCollectionElementType(), value.getIterableValue());
case MAP:
+ if (isNullFieldValueFromProto(fieldType, value.hasMapValue())) {
+ return null;
+ }
return mapFromProto(
fieldType.getMapKeyType(), fieldType.getMapValueType(),
value.getMapValue());
case ROW:
+ if (isNullFieldValueFromProto(fieldType, value.hasRowValue())) {
+ return null;
+ }
return rowFromProto(value.getRowValue(), fieldType);
case LOGICAL_TYPE:
+ if (isNullFieldValueFromProto(fieldType, value.hasLogicalTypeValue()))
{
+ return null;
+ }
+ return logicalTypeFromProto(fieldType.getLogicalType(), value);
+ case DATETIME:
+ if (isNullFieldValueFromProto(fieldType, value.hasLogicalTypeValue()))
{
+ return null;
+ }
+ return logicalTypeFromProtoViaCoder(
+ FieldType.INT64, fieldType, value.getLogicalTypeValue());
+ case DECIMAL:
+ if (isNullFieldValueFromProto(fieldType, value.hasLogicalTypeValue()))
{
+ return null;
+ }
+ return logicalTypeFromProtoViaCoder(
+ FieldType.BYTES, fieldType, value.getLogicalTypeValue());
Review comment:
Why add `logicalTypeFromProtoViaCoder`? Couldn't we just use
`logicalTypeFromProto`?
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTranslationTest.java
##########
@@ -290,6 +297,74 @@ public void fromProtoAndToProto() throws Exception {
}
}
+ /** Tests round-trip proto encodings for {@link Row}. */
+ @RunWith(Parameterized.class)
+ public static class RowToFromProtoTest {
+
+ public static Row simpleRow(FieldType type, Object value) {
+ return Row.withSchema(Schema.of(Field.of("s",
type))).addValue(value).build();
+ }
+
+ public static Row simpleNullRow(FieldType type) {
+ return Row.withSchema(Schema.of(Field.nullable("s",
type))).addValue(null).build();
+ }
+
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<Row> data() {
+ Map<String, Integer> map = new HashMap<>();
+ map.put("string", 42);
+ List<String> list = new ArrayList<>();
+ list.add("string");
+ Schema schema =
+ Schema.builder()
+ .addField("field_one", FieldType.STRING)
+ .addField("field_two", FieldType.INT32)
+ .build();
+ Row row = Row.withSchema(schema).addValue("value").addValue(42).build();
+
+ return ImmutableList.<Row>builder()
+ .add(simpleRow(FieldType.STRING, "string"))
+ .add(simpleRow(FieldType.BOOLEAN, true))
+ .add(simpleRow(FieldType.BYTE, (byte) 12))
+ .add(simpleRow(FieldType.INT16, (short) 12))
+ .add(simpleRow(FieldType.INT32, 12))
+ .add(simpleRow(FieldType.INT64, 12L))
+ .add(simpleRow(FieldType.BYTES, new byte[] {0x42, 0x69, 0x00}))
+ .add(simpleRow(FieldType.FLOAT, (float) 12))
+ .add(simpleRow(FieldType.DOUBLE, 12.0))
+ .add(simpleRow(FieldType.map(FieldType.STRING, FieldType.INT32),
map))
+ .add(simpleRow(FieldType.array(FieldType.STRING), list))
+ .add(simpleRow(FieldType.row(row.getSchema()), row))
+ .add(simpleRow(FieldType.DATETIME, new Instant(23L)))
+ .add(simpleRow(FieldType.DECIMAL, BigDecimal.valueOf(100000)))
+ .add(simpleRow(FieldType.logicalType(new NullArgumentLogicalType()),
"str"))
+ .add(simpleRow(FieldType.logicalType(new DateTime()),
LocalDateTime.of(2000, 1, 3, 3, 1)))
+ .add(simpleNullRow(FieldType.STRING))
+ .add(simpleNullRow(FieldType.INT32))
+ .add(simpleNullRow(FieldType.map(FieldType.STRING, FieldType.INT32)))
+ .add(simpleNullRow(FieldType.array(FieldType.STRING)))
+ .add(simpleNullRow(FieldType.row(row.getSchema())))
+ .add(simpleNullRow(FieldType.logicalType(new
NullArgumentLogicalType())))
+ .add(simpleNullRow(FieldType.logicalType(new DateTime())))
+ .add(simpleNullRow(FieldType.DECIMAL))
+ .add(simpleNullRow(FieldType.DATETIME))
+ .build();
+ }
+
+ @Parameter(0)
+ public Row row;
+
+ @Test
+ public void toAndFromProto() throws Exception {
+ SchemaApi.Row rowProto = SchemaTranslation.rowToProto(row);
+ Row decodedRow =
+ (Row) SchemaTranslation.rowFromProto(rowProto,
FieldType.row(row.getSchema()));
+ System.out.println(decodedRow);
+ System.out.println(row);
Review comment:
should these print statements be dropped?
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
##########
@@ -598,6 +603,9 @@ public String toString(boolean includeFieldNames) {
}
private String toString(Schema.FieldType fieldType, Object value, boolean
includeFieldNames) {
+ if (value == null) {
+ return "";
Review comment:
maybe put some placeholder here instead, like `"<null>"`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]