This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bcb56b7 Merge pull request #14927 from [BEAM-12400] MongoDBIO support
for update within documents
bcb56b7 is described below
commit bcb56b704610e912a74eb0718b68d2a6a0ca0ccf
Author: pareshsarafmdb <[email protected]>
AuthorDate: Mon Sep 6 10:27:47 2021 +0530
Merge pull request #14927 from [BEAM-12400] MongoDBIO support for update
within documents
* Did changes for ability to update the existing documents using MongoDBIO
write
* fixes related to formatting and mongodb update
* Updated javadoc for update enhancement
* address review comments - MongoDBIO update connector
* MongoDBIO - changed UpdateField to be autovalue class
* addressed review comments - javadoc, updatefield refactoring
---
.../org/apache/beam/sdk/io/mongodb/MongoDbIO.java | 101 ++++++++++++++++++++-
.../beam/sdk/io/mongodb/UpdateConfiguration.java | 82 +++++++++++++++++
.../apache/beam/sdk/io/mongodb/UpdateField.java | 71 +++++++++++++++
.../apache/beam/sdk/io/mongodb/MongoDbIOTest.java | 39 +++++++-
4 files changed, 286 insertions(+), 7 deletions(-)
diff --git
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index fa0065e..8c7f03b 100644
---
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -31,11 +31,17 @@ import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertManyOptions;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.beam.sdk.annotations.Experimental;
@@ -93,8 +99,8 @@ import org.slf4j.LoggerFactory;
*
* <p>MongoDB sink supports writing of Document (as JSON String) in a MongoDB.
*
- * <p>To configure a MongoDB sink, you must specify a connection {@code URI},
a {@code Database}
- * name, a {@code Collection} name. For instance:
+ * <p>To configure a MongoDB sink and insert/replace, you must specify a
connection {@code URI}, a
+ * {@code Database} name, a {@code Collection} name. For instance:
*
* <pre>{@code
* pipeline
@@ -106,6 +112,27 @@ import org.slf4j.LoggerFactory;
* .withNumSplits(30))
*
* }</pre>
+ *
+ * *
+ *
+ * <p>To configure a MongoDB sink and update, you must specify a connection
{@code URI}, a {@code
+ * Database} * name, a {@code Collection} name. It matches the key with _id in
target collection.
+ * For instance: * *
+ *
+ * <pre>{@code
+ * * pipeline
+ * * .apply(...)
+ * * .apply(MongoDbIO.write()
+ * * .withUri("mongodb://localhost:27017")
+ * * .withDatabase("my-database")
+ * * .withCollection("my-collection")
+ * *
.withUpdateConfiguration(UpdateConfiguration.create().withUpdateKey("key1")
+ * * .withUpdateFields(UpdateField.fieldUpdate("$set", "source-field1",
"dest-field1"),
+ * * UpdateField.fieldUpdate("$set","source-field2",
"dest-field2"),
+ * * //pushes entire input doc to the dest field
+ * * UpdateField.fullUpdate("$push", "dest-field3") )));
+ * *
+ * }</pre>
*/
@Experimental(Kind.SOURCE_SINK)
@SuppressWarnings({
@@ -490,7 +517,6 @@ public class MongoDbIO {
.anyMatch(s -> s.keySet().contains("$limit"))) {
return Collections.singletonList(this);
}
-
splitKeys = buildAutoBuckets(mongoDatabase, spec);
for (BsonDocument shardFilter : splitKeysToMatch(splitKeys)) {
@@ -744,6 +770,8 @@ public class MongoDbIO {
abstract long batchSize();
+ abstract @Nullable UpdateConfiguration updateConfiguration();
+
abstract Builder builder();
@AutoValue.Builder
@@ -766,6 +794,8 @@ public class MongoDbIO {
abstract Builder setBatchSize(long batchSize);
+ abstract Builder setUpdateConfiguration(UpdateConfiguration
updateConfiguration);
+
abstract Write build();
}
@@ -856,6 +886,10 @@ public class MongoDbIO {
return builder().setBatchSize(batchSize).build();
}
+ public Write withUpdateConfiguration(UpdateConfiguration
updateConfiguration) {
+ return builder().setUpdateConfiguration(updateConfiguration).build();
+ }
+
@Override
public PDone expand(PCollection<Document> input) {
checkArgument(uri() != null, "withUri() is required");
@@ -910,6 +944,7 @@ public class MongoDbIO {
public void processElement(ProcessContext ctx) {
// Need to copy the document because mongoCollection.insertMany() will
mutate it
// before inserting (will assign an id).
+
batch.add(new Document(ctx.element()));
if (batch.size() >= spec.batchSize()) {
flush();
@@ -927,6 +962,15 @@ public class MongoDbIO {
}
MongoDatabase mongoDatabase = client.getDatabase(spec.database());
MongoCollection<Document> mongoCollection =
mongoDatabase.getCollection(spec.collection());
+ if (spec.updateConfiguration() == null) {
+ insertDocuments(mongoCollection);
+ } else {
+ updateDocuments(mongoCollection);
+ }
+ batch.clear();
+ }
+
+ private void insertDocuments(MongoCollection<Document> mongoCollection) {
try {
mongoCollection.insertMany(batch, new
InsertManyOptions().ordered(spec.ordered()));
} catch (MongoBulkWriteException e) {
@@ -934,8 +978,57 @@ public class MongoDbIO {
throw e;
}
}
+ }
+
+ private void updateDocuments(MongoCollection<Document> mongoCollection) {
+ if (batch.isEmpty()) {
+ return;
+ }
+ List<WriteModel<Document>> actions = new ArrayList<>();
+ @Nullable List<UpdateField> updateFields =
spec.updateConfiguration().updateFields();
+ Map<String, List<UpdateField>> operatorFieldsMap =
getOperatorFieldsMap(updateFields);
+ try {
+ for (Document doc : batch) {
+ Document updateDocument = new Document();
+ for (Map.Entry<String, List<UpdateField>> entry :
operatorFieldsMap.entrySet()) {
+ Document updateSubDocument = new Document();
+ for (UpdateField field : entry.getValue()) {
+ updateSubDocument.append(
+ field.destField(),
+ field.sourceField() == null ? doc :
doc.get(field.sourceField()));
+ }
+ updateDocument.append(entry.getKey(), updateSubDocument);
+ }
+ Document findCriteria =
+ new Document("_id",
doc.get(spec.updateConfiguration().updateKey()));
+ UpdateOptions updateOptions =
+ new
UpdateOptions().upsert(spec.updateConfiguration().isUpsert());
+ actions.add(new UpdateOneModel<>(findCriteria, updateDocument,
updateOptions));
+ }
+ mongoCollection.bulkWrite(actions, new
BulkWriteOptions().ordered(spec.ordered()));
+ } catch (MongoBulkWriteException e) {
+ if (spec.ordered()) {
+ throw e;
+ }
+ }
+ }
- batch.clear();
+ private static Map<String, List<UpdateField>> getOperatorFieldsMap(
+ List<UpdateField> updateFields) {
+ Map<String, List<UpdateField>> operatorFieldsMap = new HashMap<>();
+ for (UpdateField field : updateFields) {
+ String updateOperator = field.updateOperator();
+ if (operatorFieldsMap.containsKey(updateOperator)) {
+ List<UpdateField> fields = operatorFieldsMap.get(updateOperator);
+ fields.add(field);
+ operatorFieldsMap.put(updateOperator, fields);
+ } else {
+ List<UpdateField> fields = new ArrayList<>();
+ fields.add(field);
+ operatorFieldsMap.put(updateOperator, fields);
+ }
+ }
+ return operatorFieldsMap;
}
@Teardown
diff --git
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateConfiguration.java
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateConfiguration.java
new file mode 100644
index 0000000..cda037d
--- /dev/null
+++
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateConfiguration.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Builds a MongoDB UpdateConfiguration object. */
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public abstract class UpdateConfiguration implements Serializable {
+
+ abstract @Nullable String updateKey();
+
+ abstract @Nullable List<UpdateField> updateFields();
+
+ abstract boolean isUpsert();
+
+ private static Builder builder() {
+ return new AutoValue_UpdateConfiguration.Builder()
+ .setUpdateFields(Collections.emptyList())
+ .setIsUpsert(false);
+ }
+
+ abstract Builder toBuilder();
+
+ public static UpdateConfiguration create() {
+ return builder().build();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setUpdateFields(@Nullable List<UpdateField> updateFields);
+
+ abstract Builder setUpdateKey(@Nullable String updateKey);
+
+ abstract Builder setIsUpsert(boolean isUpsert);
+
+ abstract UpdateConfiguration build();
+ }
+
+ /**
+ * Sets the configurations for multiple updates. Takes update operator,
source field name and dest
+ * field name for each one
+ */
+ public UpdateConfiguration withUpdateFields(UpdateField... updateFields) {
+ return toBuilder().setUpdateFields(Arrays.asList(updateFields)).build();
+ }
+
+ /** Sets the filters to find. */
+ public UpdateConfiguration withUpdateKey(String updateKey) {
+ return toBuilder().setUpdateKey(updateKey).build();
+ }
+
+ public UpdateConfiguration withIsUpsert(boolean isUpsert) {
+ return toBuilder().setIsUpsert(isUpsert).build();
+ }
+}
diff --git
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
new file mode 100644
index 0000000..a1c8423
--- /dev/null
+++
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Kind.SOURCE_SINK)
+@AutoValue
+public abstract class UpdateField implements Serializable {
+
+ abstract @Nullable String updateOperator();
+
+ abstract @Nullable String sourceField();
+
+ abstract @Nullable String destField();
+
+ private static Builder builder() {
+ return new AutoValue_UpdateField.Builder().setSourceField(null);
+ }
+
+ abstract UpdateField.Builder toBuilder();
+
+ private static UpdateField create() {
+ return builder().build();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract UpdateField.Builder setUpdateOperator(@Nullable String
updateOperator);
+
+ abstract UpdateField.Builder setSourceField(@Nullable String sourceField);
+
+ abstract UpdateField.Builder setDestField(@Nullable String destField);
+
+ abstract UpdateField build();
+ }
+
+ /** Sets the limit of documents to find. */
+ public static UpdateField fullUpdate(String updateOperator, String
destField) {
+ return
create().toBuilder().setUpdateOperator(updateOperator).setDestField(destField).build();
+ }
+
+ public static UpdateField fieldUpdate(
+ String updateOperator, String sourceField, String destField) {
+ return create()
+ .toBuilder()
+ .setUpdateOperator(updateOperator)
+ .setSourceField(sourceField)
+ .setDestField(destField)
+ .build();
+ }
+}
diff --git
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 89af792..3e2b62c 100644
---
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -103,7 +103,7 @@ public class MongoDbIOTest {
client = new MongoClient("localhost", port);
LOG.info("Insert test data");
- List<Document> documents = createDocuments(1000);
+ List<Document> documents = createDocuments(1000, false);
MongoCollection<Document> collection = getCollection(COLLECTION);
collection.insertMany(documents);
}
@@ -329,7 +329,7 @@ public class MongoDbIOTest {
final int numElements = 1000;
pipeline
- .apply(Create.of(createDocuments(numElements)))
+ .apply(Create.of(createDocuments(numElements, false)))
.apply(
MongoDbIO.write()
.withUri("mongodb://localhost:" + port)
@@ -361,7 +361,37 @@ public class MongoDbIOTest {
assertEquals(1, countElements(collectionName));
}
- private static List<Document> createDocuments(final int n) {
+ @Test
+ public void testUpdate() {
+ final String collectionName = "testUpdate";
+ final int numElements = 100;
+ Document doc =
Document.parse("{\"id\":1,\"scientist\":\"Updated\",\"country\":\"India\"}");
+
+ getCollection(collectionName).insertMany(createDocuments(numElements,
true));
+ assertEquals(numElements, countElements(collectionName));
+ List<Document> docs = new ArrayList<>();
+ docs.add(doc);
+ pipeline
+ .apply(Create.of(docs))
+ .apply(
+ MongoDbIO.write()
+ .withUri("mongodb://localhost:" + port)
+ .withDatabase(DATABASE)
+ .withCollection(collectionName)
+ .withUpdateConfiguration(
+ UpdateConfiguration.create()
+ .withUpdateKey("id")
+ .withUpdateFields(
+ UpdateField.fieldUpdate("$set", "scientist",
"scientist"),
+ UpdateField.fieldUpdate("$set", "country",
"country"))));
+ pipeline.run();
+
+ Document out = getCollection(collectionName).find(new Document("_id",
1)).first();
+ assertEquals("Updated", out.get("scientist"));
+ assertEquals("India", out.get("country"));
+ }
+
+ private static List<Document> createDocuments(final int n, boolean addId) {
final String[] scientists =
new String[] {
"Einstein",
@@ -392,6 +422,9 @@ public class MongoDbIOTest {
for (int i = 1; i <= n; i++) {
int index = i % scientists.length;
Document document = new Document();
+ if (addId) {
+ document.append("_id", i);
+ }
document.append("scientist", scientists[index]);
document.append("country", country[index]);
documents.add(document);