gemini-code-assist[bot] commented on code in PR #38772:
URL: https://github.com/apache/beam/pull/38772#discussion_r3383170803
##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java:
##########
@@ -71,4 +77,104 @@ public static Document toDocument(Row row) {
}
return value;
}
+
+ /** Converts a BSON {@link Document} to a Beam {@link Row} matching the
given {@link Schema}. */
+ public static Row toRow(Document doc, Schema schema) {
+ Row.Builder rowBuilder = Row.withSchema(schema);
+ for (Field field : schema.getFields()) {
+ Object value = doc.get(field.getName());
+ rowBuilder.addValue(convertFromBsonValue(value, field.getType()));
+ }
+ return rowBuilder.build();
+ }
+
+ @SuppressWarnings({"nullness", "JavaUtilDate"})
+ private static @Nullable Object convertFromBsonValue(
+ @Nullable Object value, FieldType fieldType) {
+ if (value == null || value instanceof BsonNull) {
+ return null;
+ }
+
+ switch (fieldType.getTypeName()) {
+ case BYTE:
+ return (value instanceof Number)
+ ? ((Number) value).byteValue()
+ : Byte.parseByte(value.toString());
+ case INT16:
+ return (value instanceof Number)
+ ? ((Number) value).shortValue()
+ : Short.parseShort(value.toString());
+ case INT32:
+ return (value instanceof Number)
+ ? ((Number) value).intValue()
+ : Integer.parseInt(value.toString());
+ case INT64:
+ return (value instanceof Number)
+ ? ((Number) value).longValue()
+ : Long.parseLong(value.toString());
+ case FLOAT:
+ return (value instanceof Number)
+ ? ((Number) value).floatValue()
+ : Float.parseFloat(value.toString());
+ case DOUBLE:
+ return (value instanceof Number)
+ ? ((Number) value).doubleValue()
+ : Double.parseDouble(value.toString());
+ case DECIMAL:
+ return (value instanceof Number)
+ ? java.math.BigDecimal.valueOf(((Number) value).doubleValue())
+ : new java.math.BigDecimal(value.toString());
+ case STRING:
+ return value.toString();
+ case BOOLEAN:
+ return (value instanceof Boolean)
+ ? (Boolean) value
+ : Boolean.parseBoolean(value.toString());
+ case DATETIME:
+ if (value instanceof java.util.Date) {
+ return new Instant(((java.util.Date) value).getTime());
+ } else if (value instanceof Number) {
+ return new Instant(((Number) value).longValue());
+ } else {
+ return Instant.parse(value.toString());
+ }
+ case BYTES:
+ if (value instanceof Binary) {
+ return ((Binary) value).getData();
+ } else if (value instanceof byte[]) {
+ return (byte[]) value;
+ } else {
+ return
value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
+ }
+ case ARRAY:
+ case ITERABLE:
+ Iterable<?> iterable = (Iterable<?>) value;
+ List<Object> rowList = new ArrayList<>();
+ FieldType elementType =
Objects.requireNonNull(fieldType.getCollectionElementType());
+ for (Object item : iterable) {
+ rowList.add(convertFromBsonValue(item, elementType));
+ }
+ return rowList;
+ case MAP:
+ Map<?, ?> map = (Map<?, ?>) value;
+ Map<String, Object> rowMap = new HashMap<>();
+ FieldType valueType =
Objects.requireNonNull(fieldType.getMapValueType());
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ rowMap.put(
+ String.valueOf(entry.getKey()),
convertFromBsonValue(entry.getValue(), valueType));
+ }
+ return rowMap;
+ case ROW:
+ Schema rowSchema = Objects.requireNonNull(fieldType.getRowSchema());
+ if (value instanceof Document) {
+ return toRow((Document) value, rowSchema);
+ } else if (value instanceof Map) {
+ return toRow(new Document((Map<String, Object>) value), rowSchema);
+ } else {
+ throw new IllegalArgumentException("Cannot convert value to Row: " +
value);
+ }
Review Comment:

With `toRow` updated to accept `Map<String, Object>`, we can directly pass
the nested `Map` to `toRow` without wrapping it in a new `Document` instance.
This avoids unnecessary object allocation and copying for nested rows.
```java
case ROW:
Schema rowSchema = Objects.requireNonNull(fieldType.getRowSchema());
if (value instanceof Map) {
return toRow((Map<String, Object>) value, rowSchema);
} else {
throw new IllegalArgumentException("Cannot convert value to Row: "
+ value);
}
```
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -724,6 +726,59 @@ def write_to_tfrecord(
compression_type=getattr(CompressionTypes, compression_type))
[email protected]_fn
+@yaml_errors.maybe_with_exception_handling_transform_fn
+def read_from_mongodb(
+ root,
+ *,
+ database: str,
+ collection: str,
+ schema: Union[str, dict[str, Any]],
+ connection_uri: Optional[str] = None,
+ filter: Optional[dict[str, Any]] = None,
+ projection: Optional[Union[list[str], dict[str, Any]]] = None,
+ extra_client_params: Optional[dict[str, Any]] = None,
+ bucket_auto: bool = False):
+ """Reads data from MongoDB.
+
+ The resulting PCollection consists of rows with fields matching the provided
+ schema.
+
+ Args:
+ database: The MongoDB database name.
+ collection: The MongoDB collection name.
+ schema: JSON schema specifying the fields to select and their types.
+ connection_uri: The MongoDB connection string. e.g.
"mongodb://localhost:27017"
+ filter: A JSON/bson mapping specifying elements which must be present.
+ projection: A list of field names that should be returned or a dict
+ specifying the fields to include/exclude.
+ extra_client_params: Optional MongoClient parameters.
+ bucket_auto: If True, use MongoDB $bucketAuto aggregation to split
+ collection into bundles instead of splitVector command.
+ """
+ if isinstance(schema, str):
+ schema = json.loads(schema)
Review Comment:

In MongoDB, filters are represented as BSON/JSON maps. In YAML pipelines,
users might specify the filter as a JSON string (which is also what the Java
SchemaTransform configuration expects). To ensure compatibility and robustness,
we should parse the `filter` argument if it is passed as a string.
```suggestion
if isinstance(schema, str):
schema = json.loads(schema)
if isinstance(filter, str):
filter = json.loads(filter)
```
##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java:
##########
@@ -71,4 +77,104 @@ public static Document toDocument(Row row) {
}
return value;
}
+
+ /** Converts a BSON {@link Document} to a Beam {@link Row} matching the
given {@link Schema}. */
+ public static Row toRow(Document doc, Schema schema) {
+ Row.Builder rowBuilder = Row.withSchema(schema);
+ for (Field field : schema.getFields()) {
+ Object value = doc.get(field.getName());
+ rowBuilder.addValue(convertFromBsonValue(value, field.getType()));
+ }
+ return rowBuilder.build();
+ }
Review Comment:

Changing the signature of `toRow` to accept `Map<String, Object>` instead of
`Document` allows it to be called with any map implementation (including
`Document` since it implements `Map`). This enables us to avoid copying nested
maps into new `Document` instances in the `ROW` case, improving performance and
reducing memory overhead.
```suggestion
/** Converts a BSON {@link Document} to a Beam {@link Row} matching the
given {@link Schema}. */
public static Row toRow(Map<String, Object> doc, Schema schema) {
Row.Builder rowBuilder = Row.withSchema(schema);
for (Field field : schema.getFields()) {
Object value = doc.get(field.getName());
rowBuilder.addValue(convertFromBsonValue(value, field.getType()));
}
return rowBuilder.build();
}
```
##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.service.AutoService;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.bson.Document;
+
+/** An implementation of {@link TypedSchemaTransformProvider} for reading from
MongoDB. */
+@AutoService(SchemaTransformProvider.class)
+public class MongoDbReadSchemaTransformProvider
+ extends
TypedSchemaTransformProvider<MongoDbReadSchemaTransformConfiguration> {
+
+ private static final String OUTPUT_TAG_NAME = "output";
+ public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
+ public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+
+ private static final org.apache.beam.sdk.metrics.Counter errorCounter =
+ org.apache.beam.sdk.metrics.Metrics.counter(
+ MongoDbReadSchemaTransformProvider.class,
"MongoDB-read-error-counter");
+
+ @Override
+ protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration
configuration) {
+ return new MongoDbReadSchemaTransform(configuration);
+ }
+
+ @Override
+ public String identifier() {
+ return "beam:schematransform:org.apache.beam:mongodb_read:v1";
+ }
+
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_TAG_NAME);
+ }
+
+ /** The {@link SchemaTransform} that performs the read operation. */
+ private static class MongoDbReadSchemaTransform extends SchemaTransform {
+ private final MongoDbReadSchemaTransformConfiguration configuration;
+
+ MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration
configuration) {
+ configuration.validate();
+ this.configuration = configuration;
+ }
+
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ Schema schema =
JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema());
+
+ MongoDbIO.Read read =
+ MongoDbIO.read()
+ .withUri(configuration.getUri())
+ .withDatabase(configuration.getDatabase())
+ .withCollection(configuration.getCollection());
+
+ final String filterStr = configuration.getFilter();
+ if (filterStr != null) {
+ read =
+ read.withQueryFn(
+ new SerializableFunction<MongoCollection<Document>,
MongoCursor<Document>>() {
+ @Override
+ public MongoCursor<Document> apply(MongoCollection<Document>
collection) {
+ return
collection.find(Document.parse(filterStr)).iterator();
+ }
+ });
+ }
Review Comment:

The anonymous inner class used for `withQueryFn` captures the enclosing
`MongoDbReadSchemaTransform` instance. Since `MongoDbReadSchemaTransform` is
not `Serializable`, this will cause a `NotSerializableException` at runtime
when submitting the pipeline to a distributed runner. Using a lambda expression
avoids capturing the outer class instance because it only references the
effectively final local variable `filterStr`.
```java
final String filterStr = configuration.getFilter();
if (filterStr != null) {
read =
read.withQueryFn(
collection ->
collection.find(Document.parse(filterStr)).iterator());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]