This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 5f61a52 CAMEL-14872 mongo change stream headers (#3723)
5f61a52 is described below
commit 5f61a52b67f97abc8794cb18442c95736e263ab8
Author: Martin Bramwell <[email protected]>
AuthorDate: Fri Apr 10 07:03:36 2020 +0100
CAMEL-14872 mongo change stream headers (#3723)
* CAMEL-14872: Updates to include operationType header in change stream
* CAMEL-14872: Added document ids to headers
* CAMEL-14872: Change to do mapping for headers in the ChangeStream thread
instead of the endpoint
* CAMEL-14872: Removing no longer needed imports after changing the
location of the processing
* CAMEL-14872: Added new test and tidied up imports
* CAMEL-14872: Notes in documentation on headers returned and exchange body
contents
* CAMEL-14872: Improved docs to fit better with overall page style
---
.../mongodb/MongoDbChangeStreamsThread.java | 10 ++++++
.../camel/component/mongodb/MongoDbConstants.java | 1 +
.../mongodb/MongoDbChangeStreamsConsumerTest.java | 38 ++++++++++++++++++++++
.../modules/ROOT/pages/mongodb-component.adoc | 15 +++++++++
4 files changed, 64 insertions(+)
diff --git
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
index 2f94bdb..116b072 100644
---
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
+++
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
@@ -22,9 +22,11 @@ import com.mongodb.MongoException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import com.mongodb.client.model.changestream.OperationType;
import org.apache.camel.Exchange;
import org.bson.BsonDocument;
import org.bson.Document;
+import org.bson.types.ObjectId;
import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
@@ -70,6 +72,14 @@ class MongoDbChangeStreamsThread extends
MongoAbstractConsumerThread {
ChangeStreamDocument<Document> dbObj =
(ChangeStreamDocument<Document>) cursor.next();
Exchange exchange =
endpoint.createMongoDbExchange(dbObj.getFullDocument());
+ ObjectId documentId =
dbObj.getDocumentKey().getObjectId(MONGO_ID).getValue();
+ OperationType operationType = dbObj.getOperationType();
+
exchange.getIn().setHeader(MongoDbConstants.STREAM_OPERATION_TYPE,
operationType.getValue());
+ exchange.getIn().setHeader(MongoDbConstants.MONGO_ID,
documentId);
+ if (operationType == OperationType.DELETE) {
+ exchange.getIn().setBody(new Document(MONGO_ID,
documentId));
+ }
+
try {
if (log.isTraceEnabled()) {
log.trace("Sending exchange: {}, ObjectId: {}",
exchange, dbObj.getFullDocument().get(MONGO_ID));
diff --git
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
index d34eb66..ebcde5c 100644
---
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
+++
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
@@ -43,6 +43,7 @@ public final class MongoDbConstants {
public static final String ALLOW_DISK_USE = "CamelMongoDbAllowDiskUse";
public static final String BULK_ORDERED = "CamelMongoDbBulkOrdered";
public static final String MONGO_ID = "_id"; // default id field
+ public static final String STREAM_OPERATION_TYPE =
"CamelMongoDbStreamOperationType";
private MongoDbConstants() {
}
diff --git
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumerTest.java
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumerTest.java
index c596be8..2c01284 100644
---
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumerTest.java
+++
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumerTest.java
@@ -18,12 +18,15 @@ package org.apache.camel.component.mongodb;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.CreateCollectionOptions;
+import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.bson.Document;
+import org.bson.types.ObjectId;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class MongoDbChangeStreamsConsumerTest extends AbstractMongoDbTest {
@@ -89,7 +92,42 @@ public class MongoDbChangeStreamsConsumerTest extends
AbstractMongoDbTest {
Document actualDocument =
mock.getExchanges().get(0).getIn().getBody(Document.class);
assertEquals("value2", actualDocument.get("string"));
+ context.getRouteController().stopRoute(consumerRouteId);
+ }
+
+ @Test
+ public void operationTypeAndIdHeaderTest() throws Exception {
+ assertEquals(0, mongoCollection.countDocuments());
+ MockEndpoint mock = getMockEndpoint("mock:test");
+ mock.expectedMessageCount(2);
+
+ String consumerRouteId = "simpleConsumer";
+ addTestRoutes();
+ context.getRouteController().startRoute(consumerRouteId);
+
+ ObjectId objectId = new ObjectId();
+ Thread t = new Thread(() -> {
+ mongoCollection.insertOne(new Document("_id",
objectId).append("string", "value"));
+ mongoCollection.deleteOne(new Document("_id", objectId));
+ });
+
+ t.start();
+ t.join();
+
+ mock.assertIsSatisfied();
+ Exchange insertExchange = mock.getExchanges().get(0);
+ assertEquals("insert",
insertExchange.getIn().getHeader("CamelMongoDbStreamOperationType"));
+ assertEquals(objectId, insertExchange.getIn().getHeader("_id"));
+
+ Exchange deleteExchange = mock.getExchanges().get(1);
+ Document deleteBodyDocument =
deleteExchange.getIn().getBody(Document.class);
+ String deleteBody = "{\"_id\": \"" + objectId.toHexString() + "\"}";
+ assertEquals("delete",
deleteExchange.getIn().getHeader("CamelMongoDbStreamOperationType"));
+ assertEquals(objectId, deleteExchange.getIn().getHeader("_id"));
+ assertEquals(1, deleteBodyDocument.size());
+ assertTrue(deleteBodyDocument.containsKey("_id"));
+ assertEquals(objectId.toHexString(),
deleteBodyDocument.getObjectId("_id").toHexString());
context.getRouteController().stopRoute(consumerRouteId);
}
diff --git a/docs/components/modules/ROOT/pages/mongodb-component.adoc
b/docs/components/modules/ROOT/pages/mongodb-component.adoc
index 070f904..6a746c2 100644
--- a/docs/components/modules/ROOT/pages/mongodb-component.adoc
+++ b/docs/components/modules/ROOT/pages/mongodb-component.adoc
@@ -967,6 +967,7 @@
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasing
Change Streams allow applications to access real-time data changes without the
complexity and risk of tailing the MongoDB oplog.
Applications can use change streams to subscribe to all data changes on a
collection and immediately react to them.
Because change streams use the aggregation framework, applications can also
filter for specific changes or transform the notifications at will.
+The exchange body will contain the full document of any change.
To configure Change Streams Consumer you need to specify `consumerType`,
`database`, `collection`
and optional JSON property `streamFilter` to filter events.
@@ -991,6 +992,20 @@
from("mongodb:myDb?consumerType=changeStreams&database=flights&collection=ticket
.to("mock:test");
-------------
+The `changeStreams` consumer type will also return the following OUT headers:
+
+[width="100%",cols="10%,10%,60%,20%",options="header",]
+|=======================================================================
+|Header key |Quick constant |Description (extracted from MongoDB API doc)
|Data type
+
+|`CamelMongoDbStreamOperationType` |`MongoDbConstants.STREAM_OPERATION_TYPE` |
The type of operation that occurred. Can
+be any of the following values: insert, delete, replace, update, drop, rename,
dropDatabase, invalidate. |String
+
+|`_id` |`MongoDbConstants.MONGO_ID` |A document that contains the _id of the
document created or modified by the insert,
+replace, delete, update operations (i.e. CRUD operations). For sharded
collections, also displays the full shard key for
+the document. The _id field is not repeated if it is already a part of the
shard key. |ObjectId
+|=======================================================================
+
== Type conversions
The `MongoDbBasicConverters` type converter included with the