This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f61a52  CAMEL-14872 mongo change stream headers (#3723)
5f61a52 is described below

commit 5f61a52b67f97abc8794cb18442c95736e263ab8
Author: Martin Bramwell <[email protected]>
AuthorDate: Fri Apr 10 07:03:36 2020 +0100

    CAMEL-14872 mongo change stream headers (#3723)
    
    * CAMEL-14872: Updates to include operationType header in change stream
    
    * CAMEL-14872: Added document ids to headers
    
    * CAMEL-14872: Change to do mapping for headers in the ChangeStream thread 
instead of the endpoint
    
    * CAMEL-14872: Removing no longer needed imports after changing the 
location of the processing
    
    * CAMEL-14872: Added new test and tidied up imports
    
    * CAMEL-14872: Notes in documentation on headers returned and exchange body 
contents
    
    * CAMEL-14872: Improved docs to fit better with overall page style
---
 .../mongodb/MongoDbChangeStreamsThread.java        | 10 ++++++
 .../camel/component/mongodb/MongoDbConstants.java  |  1 +
 .../mongodb/MongoDbChangeStreamsConsumerTest.java  | 38 ++++++++++++++++++++++
 .../modules/ROOT/pages/mongodb-component.adoc      | 15 +++++++++
 4 files changed, 64 insertions(+)

diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
index 2f94bdb..116b072 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
@@ -22,9 +22,11 @@ import com.mongodb.MongoException;
 import com.mongodb.client.ChangeStreamIterable;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import com.mongodb.client.model.changestream.OperationType;
 import org.apache.camel.Exchange;
 import org.bson.BsonDocument;
 import org.bson.Document;
+import org.bson.types.ObjectId;
 
 import static org.apache.camel.component.mongodb.MongoDbConstants.MONGO_ID;
 
@@ -70,6 +72,14 @@ class MongoDbChangeStreamsThread extends 
MongoAbstractConsumerThread {
                 ChangeStreamDocument<Document> dbObj = 
(ChangeStreamDocument<Document>) cursor.next();
                 Exchange exchange = 
endpoint.createMongoDbExchange(dbObj.getFullDocument());
 
+                ObjectId documentId = 
dbObj.getDocumentKey().getObjectId(MONGO_ID).getValue();
+                OperationType operationType = dbObj.getOperationType();
+                
exchange.getIn().setHeader(MongoDbConstants.STREAM_OPERATION_TYPE, 
operationType.getValue());
+                exchange.getIn().setHeader(MongoDbConstants.MONGO_ID, 
documentId);
+                if (operationType == OperationType.DELETE) {
+                    exchange.getIn().setBody(new Document(MONGO_ID, 
documentId));
+                }
+
                 try {
                     if (log.isTraceEnabled()) {
                         log.trace("Sending exchange: {}, ObjectId: {}", 
exchange, dbObj.getFullDocument().get(MONGO_ID));
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
index d34eb66..ebcde5c 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
@@ -43,6 +43,7 @@ public final class MongoDbConstants {
     public static final String ALLOW_DISK_USE = "CamelMongoDbAllowDiskUse";
     public static final String BULK_ORDERED = "CamelMongoDbBulkOrdered";
     public static final String MONGO_ID = "_id"; // default id field
+    public static final String STREAM_OPERATION_TYPE = 
"CamelMongoDbStreamOperationType";
 
     private MongoDbConstants() {
     }
diff --git 
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumerTest.java
 
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumerTest.java
index c596be8..2c01284 100644
--- 
a/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumerTest.java
+++ 
b/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumerTest.java
@@ -18,12 +18,15 @@ package org.apache.camel.component.mongodb;
 
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.model.CreateCollectionOptions;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.bson.Document;
+import org.bson.types.ObjectId;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class MongoDbChangeStreamsConsumerTest extends AbstractMongoDbTest {
 
@@ -89,7 +92,42 @@ public class MongoDbChangeStreamsConsumerTest extends 
AbstractMongoDbTest {
 
         Document actualDocument = 
mock.getExchanges().get(0).getIn().getBody(Document.class);
         assertEquals("value2", actualDocument.get("string"));
+        context.getRouteController().stopRoute(consumerRouteId);
+    }
+
+    @Test
+    public void operationTypeAndIdHeaderTest() throws Exception {
+        assertEquals(0, mongoCollection.countDocuments());
+        MockEndpoint mock = getMockEndpoint("mock:test");
+        mock.expectedMessageCount(2);
+
+        String consumerRouteId = "simpleConsumer";
+        addTestRoutes();
+        context.getRouteController().startRoute(consumerRouteId);
+
+        ObjectId objectId = new ObjectId();
+        Thread t = new Thread(() -> {
+            mongoCollection.insertOne(new Document("_id", 
objectId).append("string", "value"));
+            mongoCollection.deleteOne(new Document("_id", objectId));
+        });
+
+        t.start();
+        t.join();
+
+        mock.assertIsSatisfied();
 
+        Exchange insertExchange = mock.getExchanges().get(0);
+        assertEquals("insert", 
insertExchange.getIn().getHeader("CamelMongoDbStreamOperationType"));
+        assertEquals(objectId, insertExchange.getIn().getHeader("_id"));
+
+        Exchange deleteExchange = mock.getExchanges().get(1);
+        Document deleteBodyDocument = 
deleteExchange.getIn().getBody(Document.class);
+        String deleteBody = "{\"_id\": \"" + objectId.toHexString() + "\"}";
+        assertEquals("delete", 
deleteExchange.getIn().getHeader("CamelMongoDbStreamOperationType"));
+        assertEquals(objectId, deleteExchange.getIn().getHeader("_id"));
+        assertEquals(1, deleteBodyDocument.size());
+        assertTrue(deleteBodyDocument.containsKey("_id"));
+        assertEquals(objectId.toHexString(), 
deleteBodyDocument.getObjectId("_id").toHexString());
         context.getRouteController().stopRoute(consumerRouteId);
     }
 
diff --git a/docs/components/modules/ROOT/pages/mongodb-component.adoc 
b/docs/components/modules/ROOT/pages/mongodb-component.adoc
index 070f904..6a746c2 100644
--- a/docs/components/modules/ROOT/pages/mongodb-component.adoc
+++ b/docs/components/modules/ROOT/pages/mongodb-component.adoc
@@ -967,6 +967,7 @@ 
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasing
 Change Streams allow applications to access real-time data changes without the 
complexity and risk of tailing the MongoDB oplog.
 Applications can use change streams to subscribe to all data changes on a 
collection and immediately react to them.
 Because change streams use the aggregation framework, applications can also 
filter for specific changes or transform the notifications at will.
+The exchange body will contain the full document of any change.
 
 To configure Change Streams Consumer you need to specify `consumerType`, 
`database`, `collection`
 and optional JSON property `streamFilter` to filter events.
@@ -991,6 +992,20 @@ 
from("mongodb:myDb?consumerType=changeStreams&database=flights&collection=ticket
     .to("mock:test");
 -------------
 
+The `changeStreams` consumer type will also return the following OUT headers:
+
+[width="100%",cols="10%,10%,60%,20%",options="header",]
+|=======================================================================
+|Header key |Quick constant |Description (extracted from MongoDB API doc) 
|Data type
+
+|`CamelMongoDbStreamOperationType` |`MongoDbConstants.STREAM_OPERATION_TYPE` | 
The type of operation that occurred. Can
+be any of the following values: insert, delete, replace, update, drop, rename, 
dropDatabase, invalidate. |String
+
+|`_id` |`MongoDbConstants.MONGO_ID` |A document that contains the _id of the 
document created or modified by the insert,
+replace, delete, update operations (i.e. CRUD operations). For sharded 
collections, also displays the full shard key for
+the document. The _id field is not repeated if it is already a part of the 
shard key. |ObjectId
+|=======================================================================
+
 == Type conversions
 
 The `MongoDbBasicConverters` type converter included with the

Reply via email to