derrickaw commented on code in PR #38772:
URL: https://github.com/apache/beam/pull/38772#discussion_r3389426750
##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java:
##########
@@ -71,4 +77,104 @@ public static Document toDocument(Row row) {
}
return value;
}
+
+ /** Converts a BSON {@link Document} to a Beam {@link Row} matching the
given {@link Schema}. */
+ public static Row toRow(Document doc, Schema schema) {
+ Row.Builder rowBuilder = Row.withSchema(schema);
+ for (Field field : schema.getFields()) {
+ Object value = doc.get(field.getName());
+ rowBuilder.addValue(convertFromBsonValue(value, field.getType()));
+ }
+ return rowBuilder.build();
+ }
+
+ @SuppressWarnings({"nullness", "JavaUtilDate"})
+ private static @Nullable Object convertFromBsonValue(
+ @Nullable Object value, FieldType fieldType) {
+ if (value == null || value instanceof BsonNull) {
+ return null;
+ }
+
+ switch (fieldType.getTypeName()) {
+ case BYTE:
+ return (value instanceof Number)
+ ? ((Number) value).byteValue()
+ : Byte.parseByte(value.toString());
+ case INT16:
+ return (value instanceof Number)
+ ? ((Number) value).shortValue()
+ : Short.parseShort(value.toString());
+ case INT32:
+ return (value instanceof Number)
+ ? ((Number) value).intValue()
+ : Integer.parseInt(value.toString());
+ case INT64:
+ return (value instanceof Number)
+ ? ((Number) value).longValue()
+ : Long.parseLong(value.toString());
+ case FLOAT:
+ return (value instanceof Number)
+ ? ((Number) value).floatValue()
+ : Float.parseFloat(value.toString());
+ case DOUBLE:
+ return (value instanceof Number)
+ ? ((Number) value).doubleValue()
+ : Double.parseDouble(value.toString());
+ case DECIMAL:
+ return (value instanceof Number)
+ ? java.math.BigDecimal.valueOf(((Number) value).doubleValue())
+ : new java.math.BigDecimal(value.toString());
+ case STRING:
+ return value.toString();
+ case BOOLEAN:
+ return (value instanceof Boolean)
+ ? (Boolean) value
+ : Boolean.parseBoolean(value.toString());
+ case DATETIME:
+ if (value instanceof java.util.Date) {
+ return new Instant(((java.util.Date) value).getTime());
+ } else if (value instanceof Number) {
+ return new Instant(((Number) value).longValue());
+ } else {
+ return Instant.parse(value.toString());
+ }
+ case BYTES:
+ if (value instanceof Binary) {
+ return ((Binary) value).getData();
+ } else if (value instanceof byte[]) {
+ return (byte[]) value;
+ } else {
+ return
value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
+ }
+ case ARRAY:
+ case ITERABLE:
+ Iterable<?> iterable = (Iterable<?>) value;
+ List<Object> rowList = new ArrayList<>();
+ FieldType elementType =
Objects.requireNonNull(fieldType.getCollectionElementType());
+ for (Object item : iterable) {
+ rowList.add(convertFromBsonValue(item, elementType));
+ }
+ return rowList;
+ case MAP:
+ Map<?, ?> map = (Map<?, ?>) value;
+ Map<String, Object> rowMap = new HashMap<>();
+ FieldType valueType =
Objects.requireNonNull(fieldType.getMapValueType());
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ rowMap.put(
+ String.valueOf(entry.getKey()),
convertFromBsonValue(entry.getValue(), valueType));
+ }
+ return rowMap;
+ case ROW:
+ Schema rowSchema = Objects.requireNonNull(fieldType.getRowSchema());
+ if (value instanceof Document) {
+ return toRow((Document) value, rowSchema);
+ } else if (value instanceof Map) {
+ return toRow(new Document((Map<String, Object>) value), rowSchema);
+ } else {
+ throw new IllegalArgumentException("Cannot convert value to Row: " +
value);
+ }
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]