derrickaw commented on code in PR #38772:
URL: https://github.com/apache/beam/pull/38772#discussion_r3430645285
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -724,6 +726,61 @@ def write_to_tfrecord(
compression_type=getattr(CompressionTypes, compression_type))
[email protected]_fn
+@yaml_errors.maybe_with_exception_handling_transform_fn
+def read_from_mongodb(
+ root,
+ *,
+ database: str,
+ collection: str,
+ schema: Union[str, dict[str, Any]],
+ connection_uri: Optional[str] = None,
+ filter: Optional[dict[str, Any]] = None,
+ projection: Optional[Union[list[str], dict[str, Any]]] = None,
+ extra_client_params: Optional[dict[str, Any]] = None,
+ bucket_auto: bool = False):
+ """Reads data from MongoDB.
+
+ The resulting PCollection consists of rows with fields matching the provided
+ schema.
+
+ Args:
+ database: The MongoDB database name.
+ collection: The MongoDB collection name.
+ schema: JSON schema specifying the fields to select and their types.
+ connection_uri: The MongoDB connection string. e.g.
"mongodb://localhost:27017"
+ filter: A JSON/bson mapping specifying elements which must be present.
+ projection: A list of field names that should be returned or a dict
+ specifying the fields to include/exclude.
+ extra_client_params: Optional MongoClient parameters.
+ bucket_auto: If True, use MongoDB $bucketAuto aggregation to split
+ collection into bundles instead of splitVector command.
+ """
+ if isinstance(schema, str):
+ schema = json.loads(schema)
+ if isinstance(filter, str):
+ filter = json.loads(filter)
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]