derrickaw commented on code in PR #38772:
URL: https://github.com/apache/beam/pull/38772#discussion_r3430620590


##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java:
##########
@@ -71,4 +76,116 @@ public static Document toDocument(Row row) {
     }
     return value;
   }
+
+  /**
+   * Converts a BSON {@link Document} (or any Map representing fields) to a 
Beam {@link Row}
+   * matching the given {@link Schema}.
+   */
+  public static Row toRow(Map<?, ?> doc, Schema schema) {
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    for (Field field : schema.getFields()) {
+      Object value = doc.get(field.getName());
+      rowBuilder.addValue(convertFromBsonValue(value, field.getType()));
+    }
+    return rowBuilder.build();
+  }
+
+  @SuppressWarnings("JavaUtilDate")
+  private static @Nullable Object convertFromBsonValue(
+      @Nullable Object value, FieldType fieldType) {
+    if (value == null || value instanceof BsonNull) {
+      return null;
+    }
+
+    switch (fieldType.getTypeName()) {
+      case BYTE:
+        return (value instanceof Number)
+            ? ((Number) value).byteValue()
+            : Byte.parseByte(value.toString());
+      case INT16:
+        return (value instanceof Number)
+            ? ((Number) value).shortValue()
+            : Short.parseShort(value.toString());
+      case INT32:
+        return (value instanceof Number)
+            ? ((Number) value).intValue()
+            : Integer.parseInt(value.toString());
+      case INT64:
+        return (value instanceof Number)
+            ? ((Number) value).longValue()
+            : Long.parseLong(value.toString());
+      case FLOAT:
+        return (value instanceof Number)
+            ? ((Number) value).floatValue()
+            : Float.parseFloat(value.toString());
+      case DOUBLE:
+        return (value instanceof Number)
+            ? ((Number) value).doubleValue()
+            : Double.parseDouble(value.toString());
+      case DECIMAL:
+        return (value instanceof Number)
+            ? java.math.BigDecimal.valueOf(((Number) value).doubleValue())
+            : new java.math.BigDecimal(value.toString());
+      case STRING:
+        return value.toString();
+      case BOOLEAN:
+        return (value instanceof Boolean)
+            ? (Boolean) value
+            : Boolean.parseBoolean(value.toString());
+      case DATETIME:
+        if (value instanceof java.util.Date) {
+          return new Instant(((java.util.Date) value).getTime());
+        } else if (value instanceof Number) {
+          return new Instant(((Number) value).longValue());
+        } else {
+          return Instant.parse(value.toString());
+        }
+      case BYTES:
+        if (value instanceof Binary) {
+          return ((Binary) value).getData();
+        } else if (value instanceof byte[]) {
+          return (byte[]) value;
+        } else {
+          return 
value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
+        }
+      case ARRAY:
+      case ITERABLE:
+        Iterable<?> iterable = (Iterable<?>) value;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to