gemini-code-assist[bot] commented on code in PR #38772: URL: https://github.com/apache/beam/pull/38772#discussion_r3391114719
########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import com.google.auto.service.AutoService; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.bson.Document; + +/** An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbReadSchemaTransformProvider + extends TypedSchemaTransformProvider<MongoDbReadSchemaTransformConfiguration> { + + private static final String OUTPUT_TAG_NAME = "output"; + public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {}; + public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {}; + + private static final org.apache.beam.sdk.metrics.Counter errorCounter = + org.apache.beam.sdk.metrics.Metrics.counter( + MongoDbReadSchemaTransformProvider.class, "MongoDB-read-error-counter"); + + @Override + protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { + return new MongoDbReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mongodb_read:v1"; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.emptyList(); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG_NAME); + } + + /** The {@link SchemaTransform} that performs the read operation. */ + private static class MongoDbReadSchemaTransform extends SchemaTransform { + private final MongoDbReadSchemaTransformConfiguration configuration; + + MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema schema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema()); + + MongoDbIO.Read read = + MongoDbIO.read() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection()); + + final String filterStr = configuration.getFilter(); + if (filterStr != null) { + read = + read.withQueryFn(collection -> collection.find(Document.parse(filterStr)).iterator()); + } + + PCollection<Document> mongoDocs = input.getPipeline().apply("ReadFromMongoDb", read); + + boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling()); + Schema errorSchema = ErrorHandling.errorSchemaBytes(); + + PCollectionTuple outputTuple = + mongoDocs.apply( + "ConvertToBeamRows", + ParDo.of(new DocumentToRowFn(schema, handleErrors, errorSchema)) + .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG))); + + PCollection<Row> beamRows = outputTuple.get(OUTPUT_TAG).setRowSchema(schema); + PCollection<Row> errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(errorSchema); + + PCollectionRowTuple output = PCollectionRowTuple.of(OUTPUT_TAG_NAME, beamRows); + ErrorHandling errorHandling = configuration.getErrorHandling(); + if (handleErrors && errorHandling != null) { + output = output.and(errorHandling.getOutput(), errorOutput); + } + return output; + } + } + + /** Converts a MongoDB BSON {@link Document} to a Beam {@link Row}. */ + static class DocumentToRowFn extends DoFn<Document, Row> { + private final Schema schema; + private final boolean handleErrors; + private final Schema errorSchema; + + DocumentToRowFn(Schema schema, boolean handleErrors, Schema errorSchema) { + this.schema = schema; + this.handleErrors = handleErrors; + this.errorSchema = errorSchema; + } + + @ProcessElement + public void processElement(@Element Document doc, MultiOutputReceiver receiver) { + try { + receiver.get(OUTPUT_TAG).output(MongoDbUtils.toRow(doc, schema)); + } catch (Exception e) { + if (!handleErrors) { + throw new RuntimeException( + "Failed to convert BSON Document to Beam Row: " + doc.toJson(), e); + } + errorCounter.inc(); + byte[] docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8); + receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e)); Review Comment:  If `doc.toJson()` throws an exception (e.g., due to unsupported BSON types or serialization issues), calling it again inside the `catch` block will cause another exception to be thrown, bypassing the error handling mechanism and crashing the pipeline. Wrapping `doc.toJson()` in a try-catch block and falling back to `doc.toString()` ensures robust error handling. ```java byte[] docBytes; try { docBytes = doc.toJson().getBytes(java.nio.charset.StandardCharsets.UTF_8); } catch (Exception jsonEx) { docBytes = doc.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8); } receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, docBytes, e)); ``` ########## sdks/python/apache_beam/yaml/yaml_io.py: ########## @@ -724,6 +726,61 @@ def write_to_tfrecord( compression_type=getattr(CompressionTypes, compression_type)) [email protected]_fn +@yaml_errors.maybe_with_exception_handling_transform_fn +def read_from_mongodb( + root, + *, + database: str, + collection: str, + schema: Union[str, dict[str, Any]], + connection_uri: Optional[str] = None, + filter: Optional[dict[str, Any]] = None, + projection: Optional[Union[list[str], dict[str, Any]]] = None, + extra_client_params: Optional[dict[str, Any]] = None, + bucket_auto: bool = False): + """Reads data from MongoDB. + + The resulting PCollection consists of rows with fields matching the provided + schema. + + Args: + database: The MongoDB database name. + collection: The MongoDB collection name. + schema: JSON schema specifying the fields to select and their types. + connection_uri: The MongoDB connection string. e.g. "mongodb://localhost:27017" + filter: A JSON/bson mapping specifying elements which must be present. + projection: A list of field names that should be returned or a dict + specifying the fields to include/exclude. + extra_client_params: Optional MongoClient parameters. + bucket_auto: If True, use MongoDB $bucketAuto aggregation to split + collection into bundles instead of splitVector command. + """ + if isinstance(schema, str): + schema = json.loads(schema) + if isinstance(filter, str): + filter = json.loads(filter) Review Comment:  For consistency with `schema` and `filter`, we should also parse `projection` if it is provided as a JSON string in the YAML configuration. ```suggestion if isinstance(filter, str): filter = json.loads(filter) if isinstance(projection, str): projection = json.loads(projection) ``` ########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import com.google.auto.service.AutoService; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling; +import org.apache.beam.sdk.schemas.utils.JsonUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.bson.Document; + +/** An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbReadSchemaTransformProvider + extends TypedSchemaTransformProvider<MongoDbReadSchemaTransformConfiguration> { + + private static final String OUTPUT_TAG_NAME = "output"; + public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {}; + public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {}; + + private static final org.apache.beam.sdk.metrics.Counter errorCounter = + org.apache.beam.sdk.metrics.Metrics.counter( + MongoDbReadSchemaTransformProvider.class, "MongoDB-read-error-counter"); + + @Override + protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { + return new MongoDbReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mongodb_read:v1"; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.emptyList(); + } + + @Override + public List<String> outputCollectionNames() { + return Collections.singletonList(OUTPUT_TAG_NAME); + } + + /** The {@link SchemaTransform} that performs the read operation. */ + private static class MongoDbReadSchemaTransform extends SchemaTransform { + private final MongoDbReadSchemaTransformConfiguration configuration; + + MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema schema = JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema()); + + MongoDbIO.Read read = + MongoDbIO.read() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection()); + + final String filterStr = configuration.getFilter(); + if (filterStr != null) { + read = + read.withQueryFn(collection -> collection.find(Document.parse(filterStr)).iterator()); Review Comment:  The `.iterator()` call on `FindIterable` returns a `MongoCursor<Document>`, which does not match the expected return type of `withQueryFn` (which expects `FindIterable<Document>`). This will cause a compilation error. Removing `.iterator()` will correctly return the `FindIterable<Document>`. ```suggestion read.withQueryFn(collection -> collection.find(Document.parse(filterStr))); ``` ########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java: ########## @@ -71,4 +76,116 @@ public static Document toDocument(Row row) { } return value; } + + /** + * Converts a BSON {@link Document} (or any Map representing fields) to a Beam {@link Row} + * matching the given {@link Schema}. + */ + public static Row toRow(Map<?, ?> doc, Schema schema) { + Row.Builder rowBuilder = Row.withSchema(schema); + for (Field field : schema.getFields()) { + Object value = doc.get(field.getName()); + rowBuilder.addValue(convertFromBsonValue(value, field.getType())); + } + return rowBuilder.build(); + } + + @SuppressWarnings("JavaUtilDate") + private static @Nullable Object convertFromBsonValue( + @Nullable Object value, FieldType fieldType) { + if (value == null || value instanceof BsonNull) { + return null; + } + + switch (fieldType.getTypeName()) { + case BYTE: + return (value instanceof Number) + ? ((Number) value).byteValue() + : Byte.parseByte(value.toString()); + case INT16: + return (value instanceof Number) + ? ((Number) value).shortValue() + : Short.parseShort(value.toString()); + case INT32: + return (value instanceof Number) + ? ((Number) value).intValue() + : Integer.parseInt(value.toString()); + case INT64: + return (value instanceof Number) + ? ((Number) value).longValue() + : Long.parseLong(value.toString()); + case FLOAT: + return (value instanceof Number) + ? ((Number) value).floatValue() + : Float.parseFloat(value.toString()); + case DOUBLE: + return (value instanceof Number) + ? ((Number) value).doubleValue() + : Double.parseDouble(value.toString()); + case DECIMAL: + return (value instanceof Number) + ? java.math.BigDecimal.valueOf(((Number) value).doubleValue()) + : new java.math.BigDecimal(value.toString()); + case STRING: + return value.toString(); + case BOOLEAN: + return (value instanceof Boolean) + ? (Boolean) value + : Boolean.parseBoolean(value.toString()); + case DATETIME: + if (value instanceof java.util.Date) { + return new Instant(((java.util.Date) value).getTime()); + } else if (value instanceof Number) { + return new Instant(((Number) value).longValue()); + } else { + return Instant.parse(value.toString()); + } + case BYTES: + if (value instanceof Binary) { + return ((Binary) value).getData(); + } else if (value instanceof byte[]) { + return (byte[]) value; + } else { + return value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8); + } + case ARRAY: + case ITERABLE: + Iterable<?> iterable = (Iterable<?>) value; Review Comment:  To prevent a `ClassCastException` when the BSON value does not match the expected schema type, we should defensively check if `value` is an instance of `Iterable` before casting. ```java if (!(value instanceof Iterable)) { throw new IllegalArgumentException( "Expected Iterable for type " + fieldType + ", but got: " + value.getClass().getName()); } Iterable<?> iterable = (Iterable<?>) value; ``` ########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java: ########## @@ -71,4 +76,116 @@ public static Document toDocument(Row row) { } return value; } + + /** + * Converts a BSON {@link Document} (or any Map representing fields) to a Beam {@link Row} + * matching the given {@link Schema}. + */ + public static Row toRow(Map<?, ?> doc, Schema schema) { + Row.Builder rowBuilder = Row.withSchema(schema); + for (Field field : schema.getFields()) { + Object value = doc.get(field.getName()); + rowBuilder.addValue(convertFromBsonValue(value, field.getType())); + } + return rowBuilder.build(); + } + + @SuppressWarnings("JavaUtilDate") + private static @Nullable Object convertFromBsonValue( + @Nullable Object value, FieldType fieldType) { + if (value == null || value instanceof BsonNull) { + return null; + } + + switch (fieldType.getTypeName()) { + case BYTE: + return (value instanceof Number) + ? ((Number) value).byteValue() + : Byte.parseByte(value.toString()); + case INT16: + return (value instanceof Number) + ? ((Number) value).shortValue() + : Short.parseShort(value.toString()); + case INT32: + return (value instanceof Number) + ? ((Number) value).intValue() + : Integer.parseInt(value.toString()); + case INT64: + return (value instanceof Number) + ? ((Number) value).longValue() + : Long.parseLong(value.toString()); + case FLOAT: + return (value instanceof Number) + ? ((Number) value).floatValue() + : Float.parseFloat(value.toString()); + case DOUBLE: + return (value instanceof Number) + ? ((Number) value).doubleValue() + : Double.parseDouble(value.toString()); + case DECIMAL: + return (value instanceof Number) + ? java.math.BigDecimal.valueOf(((Number) value).doubleValue()) + : new java.math.BigDecimal(value.toString()); + case STRING: + return value.toString(); + case BOOLEAN: + return (value instanceof Boolean) + ? (Boolean) value + : Boolean.parseBoolean(value.toString()); + case DATETIME: + if (value instanceof java.util.Date) { + return new Instant(((java.util.Date) value).getTime()); + } else if (value instanceof Number) { + return new Instant(((Number) value).longValue()); + } else { + return Instant.parse(value.toString()); + } + case BYTES: + if (value instanceof Binary) { + return ((Binary) value).getData(); + } else if (value instanceof byte[]) { + return (byte[]) value; + } else { + return value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8); + } + case ARRAY: + case ITERABLE: + Iterable<?> iterable = (Iterable<?>) value; + List<@Nullable Object> rowList = new ArrayList<>(); + FieldType elementType = fieldType.getCollectionElementType(); + if (elementType == null) { + throw new IllegalArgumentException( + "Collection element type cannot be null for type: " + fieldType); + } + for (Object item : iterable) { + rowList.add(convertFromBsonValue(item, elementType)); + } + return rowList; + case MAP: + Map<?, ?> map = (Map<?, ?>) value; Review Comment:  To prevent a `ClassCastException` when the BSON value does not match the expected schema type, we should defensively check if `value` is an instance of `Map` before casting. ```java if (!(value instanceof Map)) { throw new IllegalArgumentException( "Expected Map for type " + fieldType + ", but got: " + value.getClass().getName()); } Map<?, ?> map = (Map<?, ?>) value; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
