This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5307c7138af Support MAP type in RowJson and fix datetime parsing with
spaces (#38865)
5307c7138af is described below
commit 5307c7138af8b5daba7a5495aba87d53ae9b0ae7
Author: Danny McCormick <[email protected]>
AuthorDate: Thu Jun 11 16:38:30 2026 -0400
Support MAP type in RowJson and fix datetime parsing with spaces (#38865)
---
.../java/org/apache/beam/sdk/util/RowJson.java | 65 +++++++++++++++++++++-
.../beam/sdk/util/RowJsonValueExtractors.java | 9 ++-
.../java/org/apache/beam/sdk/util/RowJsonTest.java | 26 ++++++++-
3 files changed, 97 insertions(+), 3 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
index c63f673ade2..ccfbd87d450 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJson.java
@@ -26,6 +26,7 @@ import static
org.apache.beam.sdk.schemas.Schema.TypeName.FLOAT;
import static org.apache.beam.sdk.schemas.Schema.TypeName.INT16;
import static org.apache.beam.sdk.schemas.Schema.TypeName.INT32;
import static org.apache.beam.sdk.schemas.Schema.TypeName.INT64;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.MAP;
import static org.apache.beam.sdk.schemas.Schema.TypeName.STRING;
import static
org.apache.beam.sdk.util.RowJsonValueExtractors.booleanValueExtractor;
import static
org.apache.beam.sdk.util.RowJsonValueExtractors.byteValueExtractor;
@@ -57,6 +58,9 @@ import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.schemas.Schema;
@@ -95,7 +99,8 @@ import org.joda.time.ReadableInstant;
})
public class RowJson {
private static final ImmutableSet<TypeName> SUPPORTED_TYPES =
- ImmutableSet.of(BYTE, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN,
STRING, DECIMAL, DATETIME);
+ ImmutableSet.of(
+ BYTE, INT16, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, STRING, DECIMAL,
DATETIME, MAP);
private static final ImmutableSet<String> KNOWN_LOGICAL_TYPE_IDENTIFIERS =
ImmutableSet.of(
SqlTypes.DATE.getIdentifier(),
@@ -160,6 +165,14 @@ public class RowJson {
return findUnsupportedFields(fieldType.getCollectionElementType(),
fieldName + "[]");
}
+ if (fieldTypeName.isMapType()) {
+ if (!STRING.equals(fieldType.getMapKeyType().getTypeName())) {
+ return ImmutableList.of(
+ new UnsupportedField(fieldName + ".key",
fieldType.getMapKeyType().getTypeName()));
+ }
+ return findUnsupportedFields(fieldType.getMapValueType(), fieldName +
"{}");
+ }
+
if (fieldTypeName.isLogicalType()) {
if
(KNOWN_LOGICAL_TYPE_IDENTIFIERS.contains(fieldType.getLogicalType().getIdentifier()))
{
return ImmutableList.of();
@@ -303,6 +316,10 @@ public class RowJson {
return jsonArrayToList(fieldValue);
}
+ if (fieldValue.isMapType()) {
+ return jsonObjectToMap(fieldValue);
+ }
+
if (fieldValue.typeName().isLogicalType()) {
String identifier = fieldValue.type().getLogicalType().getIdentifier();
if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
@@ -365,6 +382,32 @@ public class RowJson {
.collect(toImmutableList());
}
+ private Map<String, Object> jsonObjectToMap(FieldValue mapFieldValue) {
+ if (!mapFieldValue.isJsonObject()) {
+ throw new UnsupportedRowJsonException(
+ "Expected JSON object for field '"
+ + mapFieldValue.name()
+ + "'. Instead got "
+ + mapFieldValue.jsonNodeType().name());
+ }
+
+ Map<String, Object> result = new HashMap<>();
+ Iterator<Map.Entry<String, JsonNode>> fields =
mapFieldValue.jsonValue().fields();
+ while (fields.hasNext()) {
+ Map.Entry<String, JsonNode> field = fields.next();
+ String key = field.getKey();
+ JsonNode value = field.getValue();
+
+ Object extractedValue =
+ extractJsonNodeValue(
+ FieldValue.of(
+ mapFieldValue.name() + "['" + key + "']",
mapFieldValue.mapValueType(), value));
+
+ result.put(key, extractedValue);
+ }
+ return result;
+ }
+
private static Object extractJsonPrimitiveValue(FieldValue fieldValue) {
try {
return
JSON_VALUE_GETTERS.get(fieldValue.typeName()).extractValue(fieldValue.jsonValue());
@@ -440,6 +483,18 @@ public class RowJson {
return type().getRowSchema();
}
+ boolean isMapType() {
+ return TypeName.MAP.equals(type().getTypeName());
+ }
+
+ FieldType mapKeyType() {
+ return type().getMapKeyType();
+ }
+
+ FieldType mapValueType() {
+ return type().getMapValueType();
+ }
+
static FieldValue of(String name, FieldType type, JsonNode jsonValue) {
return new AutoValue_RowJson_RowJsonDeserializer_FieldValue(name,
type, jsonValue);
}
@@ -538,6 +593,14 @@ public class RowJson {
case ROW:
writeRow((Row) value, type.getRowSchema(), gen);
break;
+ case MAP:
+ gen.writeStartObject();
+ for (Map.Entry<Object, Object> entry : ((Map<Object, Object>)
value).entrySet()) {
+ gen.writeFieldName(entry.getKey().toString());
+ writeValue(gen, type.getMapValueType(), entry.getValue());
+ }
+ gen.writeEndObject();
+ break;
case LOGICAL_TYPE:
String identifier = type.getLogicalType().getIdentifier();
if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java
index f7a925d5c22..2179b20010d 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java
@@ -189,7 +189,14 @@ class RowJsonValueExtractors {
*/
static ValueExtractor<DateTime> datetimeValueExtractor() {
return ValidatingValueExtractor.<DateTime>builder()
- .setExtractor(jsonNode -> DateTime.parse(jsonNode.textValue()))
+ .setExtractor(
+ jsonNode -> {
+ String text = jsonNode.textValue();
+ if (text.contains(" ")) {
+ text = text.replace(' ', 'T');
+ }
+ return DateTime.parse(text);
+ })
.setValidator(JsonNode::isTextual)
.build();
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java
index 328765bf7f1..81f69b62c53 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonTest.java
@@ -30,6 +30,8 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
@@ -80,7 +82,8 @@ public class RowJsonTest {
makeArrayOfArraysTestCase(),
makeNestedRowTestCase(),
makeDoublyNestedRowTestCase(),
- makeNullsTestCase());
+ makeNullsTestCase(),
+ makeMapFieldTestCase());
}
private static Object[] makeFlatRowTestCase() {
@@ -244,6 +247,21 @@ public class RowJsonTest {
return new Object[] {"Nulls", schema, rowString, expectedRow};
}
+ private static Object[] makeMapFieldTestCase() {
+ Schema schema =
+ Schema.builder().addMapField("f_map", FieldType.STRING,
FieldType.INT32).build();
+
+ String rowString = "{\n" + "\"f_map\" : {\"key1\": 1, \"key2\": 2}\n" +
"}";
+
+ Map<String, Integer> expectedMap = new HashMap<>();
+ expectedMap.put("key1", 1);
+ expectedMap.put("key2", 2);
+
+ Row expectedRow = Row.withSchema(schema).addValues(expectedMap).build();
+
+ return new Object[] {"Map field", schema, rowString, expectedRow};
+ }
+
@Test
public void testDeserialize() throws IOException {
Row parsedRow =
@@ -564,6 +582,12 @@ public class RowJsonTest {
testSupportedConversion(FieldType.DATETIME, quoted(DATETIME_STRING),
DATETIME_VALUE);
}
+ @Test
+ public void testSupportedDatetimeWithSpaceConversions() throws Exception {
+ String datetimeWithSpace = DATETIME_STRING.replace('T', ' ');
+ testSupportedConversion(FieldType.DATETIME, quoted(datetimeWithSpace),
DATETIME_VALUE);
+ }
+
private void testSupportedConversion(
FieldType fieldType, String jsonFieldValue, Object
expectedRowFieldValue) throws Exception {