gemini-code-assist[bot] commented on code in PR #38772: URL: https://github.com/apache/beam/pull/38772#discussion_r3467765967
########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import com.google.auto.service.AutoService; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.bson.Document; + +/** An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbReadSchemaTransformProvider + extends TypedSchemaTransformProvider<MongoDbReadSchemaTransformConfiguration> { + + private static final String OUTPUT_TAG_NAME = "output"; + public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {}; + public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {}; + + private static final org.apache.beam.sdk.metrics.Counter errorCounter = + org.apache.beam.sdk.metrics.Metrics.counter( + MongoDbReadSchemaTransformProvider.class, "MongoDB-read-error-counter"); + + @Override + protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { + return new MongoDbReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mongodb_read:v1"; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.emptyList(); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG_NAME); + } + + /** The {@link SchemaTransform} that performs the read operation. */ + private static class MongoDbReadSchemaTransform extends SchemaTransform { + private final MongoDbReadSchemaTransformConfiguration configuration; + + MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema schema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema()); + + MongoDbIO.Read read = + MongoDbIO.read() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection()); + + final String filterStr = configuration.getFilter(); + if (filterStr != null) { + read = read.withQueryFn(FindQuery.create().withFilters(Document.parse(filterStr))); + } Review Comment:  Using `Document.parse(filterStr)` returns an `org.bson.Document`, which is not serializable and does not match the `BsonDocument` type expected by `FindQuery.withFilters()`. This can lead to compilation errors or `NotSerializableException` at runtime when running on distributed runners. Use `BsonDocument.parse(filterStr)` instead. ```suggestion final String filterStr = configuration.getFilter(); if (filterStr != null) { read = read.withQueryFn(FindQuery.create().withFilters(org.bson.BsonDocument.parse(filterStr))); } ``` ########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java: ########## @@ -71,4 +76,127 @@ public static Document toDocument(Row row) { } return value; } + + /** + * Converts a BSON {@link Document} (or any Map representing fields) to a Beam {@link Row} + * matching the given {@link Schema}. + */ + public static Row toRow(Map<?, ?> doc, Schema schema) { + Row.Builder rowBuilder = Row.withSchema(schema); + for (Field field : schema.getFields()) { + Object value = doc.get(field.getName()); + rowBuilder.addValue(convertFromBsonValue(value, field.getType())); + } + return rowBuilder.build(); + } + + @SuppressWarnings("JavaUtilDate") + private static @Nullable Object convertFromBsonValue( + @Nullable Object value, FieldType fieldType) { + if (value == null || value instanceof BsonNull) { + return null; + } + + switch (fieldType.getTypeName()) { + case BYTE: + return (value instanceof Number) + ? ((Number) value).byteValue() + : Byte.parseByte(value.toString()); + case INT16: + return (value instanceof Number) + ? ((Number) value).shortValue() + : Short.parseShort(value.toString()); + case INT32: + return (value instanceof Number) + ? ((Number) value).intValue() + : Integer.parseInt(value.toString()); + case INT64: + return (value instanceof Number) + ? ((Number) value).longValue() + : Long.parseLong(value.toString()); + case FLOAT: + return (value instanceof Number) + ? ((Number) value).floatValue() + : Float.parseFloat(value.toString()); + case DOUBLE: + return (value instanceof Number) + ? ((Number) value).doubleValue() + : Double.parseDouble(value.toString()); + case DECIMAL: + return (value instanceof Number) + ? java.math.BigDecimal.valueOf(((Number) value).doubleValue()) + : new java.math.BigDecimal(value.toString()); + case STRING: + return value.toString(); + case BOOLEAN: + return (value instanceof Boolean) + ? (Boolean) value + : Boolean.parseBoolean(value.toString()); Review Comment:  Using `Boolean.parseBoolean(value.toString())` for non-boolean types will return `false` for any string other than `"true"`. If MongoDB stores booleans as numbers (e.g., `1` for `true` and `0` for `false`), this will incorrectly convert `1` to `false`. It is safer to handle `Number` types explicitly. ```java case BOOLEAN: if (value instanceof Boolean) { return (Boolean) value; } else if (value instanceof Number) { return ((Number) value).intValue() != 0; } else { return Boolean.parseBoolean(value.toString()); } ``` ########## sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProviderTest.java: ########## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTagList; +import org.bson.Document; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link MongoDbReadSchemaTransformProvider}. */ +@RunWith(JUnit4.class) +public class MongoDbReadSchemaTransformProviderTest { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Test + public void testInvalidConfigMissingUri() { + assertThrows( + IllegalStateException.class, + () -> { + MongoDbReadSchemaTransformConfiguration.builder() + .setDatabase("db") + .setCollection("col") + .setSchema("{}") + .build() + .validate(); + }); + } Review Comment:  This test (and the subsequent ones for missing database, collection, and schema) asserts that `IllegalStateException` is thrown. However, this exception is thrown by AutoValue's `build()` method because the required fields are null, meaning `validate()` is never actually called or tested. To test the `validate()` method's validation of empty strings, you should set empty strings on the builder and assert that `IllegalArgumentException` is thrown. ```suggestion @Test public void testInvalidConfigEmptyUri() { assertThrows( IllegalArgumentException.class, () -> { MongoDbReadSchemaTransformConfiguration.builder() .setUri("") .setDatabase("db") .setCollection("col") .setSchema("{}") .build() .validate(); }); } ``` ########## sdks/python/apache_beam/yaml/yaml_io.py: ########## @@ -739,6 +741,63 @@ def write_to_tfrecord( compression_type=getattr(CompressionTypes, compression_type)) [email protected]_fn +@yaml_errors.maybe_with_exception_handling_transform_fn +def read_from_mongodb( + root, + *, + database: str, + collection: str, + schema: Union[str, dict[str, Any]], + connection_uri: Optional[str] = None, + filter: Optional[dict[str, Any]] = None, + projection: Optional[Union[list[str], dict[str, Any]]] = None, + extra_client_params: Optional[dict[str, Any]] = None, + bucket_auto: bool = False): + """Reads data from MongoDB. + + The resulting PCollection consists of rows with fields matching the provided + schema. + + Args: + database: The MongoDB database name. + collection: The MongoDB collection name. + schema: JSON schema specifying the fields to select and their types. + connection_uri: The MongoDB connection string. e.g. "mongodb://localhost:27017" + filter: A JSON/bson mapping specifying elements which must be present. + projection: A list of field names that should be returned or a dict + specifying the fields to include/exclude. + extra_client_params: Optional MongoClient parameters. + bucket_auto: If True, use MongoDB $bucketAuto aggregation to split + collection into bundles instead of splitVector command. + """ + if isinstance(schema, str): + schema = json.loads(schema) + if isinstance(filter, str): + filter = json.loads(filter) + if isinstance(projection, str): + projection = json.loads(projection) + + beam_schema = json_utils.json_schema_to_beam_schema(schema) + beam_type = schema_pb2.FieldType( + row_type=schema_pb2.RowType(schema=beam_schema)) + to_row_fn = json_utils.json_to_row(beam_type) + + output = ( + root + | mongodbio.ReadFromMongoDB( + uri=connection_uri, + db=database, + coll=collection, + filter=filter, + projection=projection, + extra_client_params=extra_client_params, + bucket_auto=bucket_auto) + | beam.Map(to_row_fn)) Review Comment:  The `mongodbio.ReadFromMongoDB` transform returns native BSON types (such as `bson.objectid.ObjectId` for `_id` and `datetime.datetime` for dates). Passing these directly to `json_utils.json_to_row` will result in `TypeError` or conversion failures if those fields are included in the schema, because `json_to_row` expects standard JSON types. Consider adding a normalization step to convert BSON-specific types (like `ObjectId` to `str`) before passing them to `to_row_fn`. ########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformConfiguration.java: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Configuration class for the MongoDB Read transform. */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class MongoDbReadSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to read from.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to read from.") + public abstract String getCollection(); + + @SchemaFieldDescription( + "The schema in which the data is encoded, defined with JSON-schema syntax (https://json-schema.org/).") + public abstract String getSchema(); + + @SchemaFieldDescription( + "An optional BSON filter to apply to the read. This should be a valid JSON string.") + @Nullable + public abstract String getFilter(); + + @SchemaFieldDescription( + "This option specifies whether and where to output rows that failed to be read.") + @Nullable + public abstract ErrorHandling getErrorHandling(); + + public void validate() { + checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); + checkArgument( + getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); + checkArgument( + getCollection() != null && !getCollection().isEmpty(), + "MongoDB collection must be specified."); + checkArgument( + getSchema() != null && !getSchema().isEmpty(), "MongoDB schema must be specified."); + } Review Comment:  Since `getUri()`, `getDatabase()`, `getCollection()`, and `getSchema()` are not annotated with `@Nullable`, AutoValue's generated `build()` method will automatically throw a `NullPointerException` or `IllegalStateException` if any of these fields are null. Therefore, the null checks in `validate()` are redundant and can be simplified to only check for empty strings. ```java public void validate() { checkArgument(!getUri().isEmpty(), "MongoDB URI must be specified."); checkArgument(!getDatabase().isEmpty(), "MongoDB database must be specified."); checkArgument(!getCollection().isEmpty(), "MongoDB collection must be specified."); checkArgument(!getSchema().isEmpty(), "MongoDB schema must be specified."); } ``` ########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java: ########## @@ -71,4 +76,127 @@ public static Document toDocument(Row row) { } return value; } + + /** + * Converts a BSON {@link Document} (or any Map representing fields) to a Beam {@link Row} + * matching the given {@link Schema}. + */ + public static Row toRow(Map<?, ?> doc, Schema schema) { + Row.Builder rowBuilder = Row.withSchema(schema); + for (Field field : schema.getFields()) { + Object value = doc.get(field.getName()); + rowBuilder.addValue(convertFromBsonValue(value, field.getType())); + } + return rowBuilder.build(); + } + + @SuppressWarnings("JavaUtilDate") + private static @Nullable Object convertFromBsonValue( + @Nullable Object value, FieldType fieldType) { + if (value == null || value instanceof BsonNull) { + return null; + } + + switch (fieldType.getTypeName()) { + case BYTE: + return (value instanceof Number) + ? ((Number) value).byteValue() + : Byte.parseByte(value.toString()); + case INT16: + return (value instanceof Number) + ? ((Number) value).shortValue() + : Short.parseShort(value.toString()); + case INT32: + return (value instanceof Number) + ? ((Number) value).intValue() + : Integer.parseInt(value.toString()); + case INT64: + return (value instanceof Number) + ? ((Number) value).longValue() + : Long.parseLong(value.toString()); + case FLOAT: + return (value instanceof Number) + ? ((Number) value).floatValue() + : Float.parseFloat(value.toString()); + case DOUBLE: + return (value instanceof Number) + ? ((Number) value).doubleValue() + : Double.parseDouble(value.toString()); + case DECIMAL: + return (value instanceof Number) + ? java.math.BigDecimal.valueOf(((Number) value).doubleValue()) + : new java.math.BigDecimal(value.toString()); + case STRING: + return value.toString(); + case BOOLEAN: + return (value instanceof Boolean) + ? (Boolean) value + : Boolean.parseBoolean(value.toString()); + case DATETIME: + if (value instanceof java.util.Date) { + return new Instant(((java.util.Date) value).getTime()); + } else if (value instanceof Number) { + return new Instant(((Number) value).longValue()); + } else { + return Instant.parse(value.toString()); + } + case BYTES: + if (value instanceof Binary) { + return ((Binary) value).getData(); + } else if (value instanceof byte[]) { + return (byte[]) value; + } else { + return value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8); + } Review Comment:  When converting an `ObjectId` (such as the default `_id` field in MongoDB) to `BYTES`, calling `value.toString().getBytes(...)` will return the 24-byte UTF-8 representation of the hex string instead of the actual 12-byte binary representation. It is highly recommended to handle `org.bson.types.ObjectId` explicitly by calling `toByteArray()`. ```suggestion case BYTES: if (value instanceof Binary) { return ((Binary) value).getData(); } else if (value instanceof byte[]) { return (byte[]) value; } else if (value instanceof org.bson.types.ObjectId) { return ((org.bson.types.ObjectId) value).toByteArray(); } else { return value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
