Author: raulk
Date: Fri Oct 12 18:26:48 2012
New Revision: 1397669

URL: http://svn.apache.org/viewvc?rev=1397669&view=rev
Log:
Fixed: CAMEL-5696 camel-mongodb write operations should put have configurable 
option to put WriteResult in header + CAMEL-5697 camel-mongodb operations don't 
transfer header values from IN to OUT. Backported to 2.10.x.

Added:
    
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbHeaderHandlingTest.java
      - copied unchanged from r1397662, 
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbHeaderHandlingTest.java
Modified:
    
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
    
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
    
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
    
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
    
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
    
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java
    
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java

Modified: 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
 Fri Oct 12 18:26:48 2012
@@ -16,7 +16,10 @@
  */
 package org.apache.camel.component.mongodb;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import com.mongodb.Mongo;
 
@@ -28,6 +31,10 @@ import org.apache.camel.util.CamelContex
  * Represents the component that manages {@link MongoDbEndpoint}.
  */
 public class MongoDbComponent extends DefaultComponent {
+    
+    public static final Set<MongoDbOperation> WRITE_OPERATIONS = 
+            new 
HashSet<MongoDbOperation>(Arrays.asList(MongoDbOperation.insert, 
MongoDbOperation.save, 
+                    MongoDbOperation.update, MongoDbOperation.remove));
 
     /**
      * Should access a singleton of type Mongo

Modified: 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
 Fri Oct 12 18:26:48 2012
@@ -35,7 +35,8 @@ public final class MongoDbConstants {
     public static final String WRITECONCERN = "CamelMongoDbWriteConcern";
     public static final String LIMIT = "CamelMongoDbLimit";
     public static final String FROM_TAILABLE = "CamelMongoDbTailable";
-    
+    public static final String WRITERESULT = "CamelMongoWriteResult";
+
     private MongoDbConstants() { }
     
 }

Modified: 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
 Fri Oct 12 18:26:48 2012
@@ -55,6 +55,7 @@ public class MongoDbEndpoint extends Def
     private WriteConcern writeConcernRef;
     private ReadPreference readPreference;
     private boolean dynamicity; // = false
+    private boolean writeResultAsHeader; // = false
     // tailable cursor consumer by default
     private MongoDbConsumerType consumerType;
     private long cursorRegenerationDelay = 1000L;
@@ -500,4 +501,17 @@ public class MongoDbEndpoint extends Def
         return persistentId;
     }
 
+    public boolean isWriteResultAsHeader() {
+        return writeResultAsHeader;
+    }
+
+    /**
+     * In write operations, it determines whether instead of returning {@link 
WriteResult} as the body of the OUT
+     * message, we transfer the IN message to the OUT and attach the 
WriteResult as a header.
+     * @param writeResultAsHeader flag to indicate if this option is enabled
+     */
+    public void setWriteResultAsHeader(boolean writeResultAsHeader) {
+        this.writeResultAsHeader = writeResultAsHeader;
+    }
+
 }

Modified: 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
 Fri Oct 12 18:26:48 2012
@@ -22,8 +22,8 @@ public enum MongoDbOperation {
     findById,
     findOneByQuery,
     findAll,
-    // group, // future
-    // mapReduce, // future
+    // group,       // future operation
+    // mapReduce,   // future operation
     
     // create/update operations
     insert,

Modified: 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
 Fri Oct 12 18:26:48 2012
@@ -32,6 +32,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,11 +115,11 @@ public class MongoDbProducer extends Def
             break;
 
         case getDbStats:
-            doGetStats(exchange, 'D');
+            doGetStats(exchange, MongoDbOperation.getDbStats);
             break;
 
         case getColStats:
-            doGetStats(exchange, 'C');
+            doGetStats(exchange, MongoDbOperation.getColStats);
             break;
 
         default:
@@ -128,17 +129,20 @@ public class MongoDbProducer extends Def
 
     // ----------- MongoDB operations ----------------
     
-    protected void doGetStats(Exchange exchange, char c) {
+    protected void doGetStats(Exchange exchange, MongoDbOperation operation) 
throws Exception {
         DBObject result = null;
         
-        if (c == 'C') {
+        if (operation == MongoDbOperation.getColStats) {
             result = calculateCollection(exchange).getStats();
-        } else if (c == 'D') {
+        } else if (operation == MongoDbOperation.getDbStats) {
             // if it's a DB, also take into account the dynamicity option and 
the DB that is used
             result = calculateCollection(exchange).getDB().getStats();
+        } else {
+            throw new CamelMongoDbException("Internal error: wrong operation 
for getStats variant" + operation);
         }
 
-        exchange.getOut().setBody(result);
+        Message responseMessage = prepareResponseMessage(exchange, operation);
+        responseMessage.setBody(result);
     }
 
     protected void doRemove(Exchange exchange) throws Exception {
@@ -147,13 +151,12 @@ public class MongoDbProducer extends Def
         
         WriteConcern wc = extractWriteConcern(exchange);
         WriteResult result = wc == null ? dbCol.remove(removeObj) : 
dbCol.remove(removeObj, wc);
-        processWriteResult(result, exchange);
         
-        Message out = exchange.getOut();
-        // we always return the WriteResult, because whether the getLastError 
was called or not, the user will have the means to call it or 
-        // obtain the cached CommandResult
-        out.setBody(result);
-        out.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
+        Message resultMessage = prepareResponseMessage(exchange, 
MongoDbOperation.remove);
+        // we always return the WriteResult, because whether the getLastError 
was called or not, 
+        // the user will have the means to call it or obtain the cached 
CommandResult
+        processAndTransferWriteResult(result, exchange);
+        resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED, 
result.getN());
     }
 
     @SuppressWarnings("unchecked")
@@ -185,12 +188,11 @@ public class MongoDbProducer extends Def
                     : dbCol.update(updateCriteria, objNew, 
calculateBooleanValue(upsert), calculateBooleanValue(multi), wc);
         }
         
-        processWriteResult(result, exchange);
-        Message out = exchange.getOut();
+        Message resultMessage = prepareResponseMessage(exchange, 
MongoDbOperation.update);
         // we always return the WriteResult, because whether the getLastError 
was called or not, the user will have the means to call it or 
         // obtain the cached CommandResult
-        out.setBody(result);
-        out.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
+        processAndTransferWriteResult(result, exchange);
+        resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED, 
result.getN());
     }
     
     protected void doSave(Exchange exchange) throws Exception {
@@ -199,10 +201,11 @@ public class MongoDbProducer extends Def
         
         WriteConcern wc = extractWriteConcern(exchange);
         WriteResult result = wc == null ? dbCol.save(saveObj) : 
dbCol.save(saveObj, wc);
-        processWriteResult(result, exchange);
+        
+        prepareResponseMessage(exchange, MongoDbOperation.save);
         // we always return the WriteResult, because whether the getLastError 
was called or not, the user will have the means to call it or 
         // obtain the cached CommandResult
-        exchange.getOut().setBody(result);
+        processAndTransferWriteResult(result, exchange);
     }
     
     protected void doFindById(Exchange exchange) throws Exception {
@@ -217,10 +220,9 @@ public class MongoDbProducer extends Def
             ret = dbCol.findOne(o, fieldFilter);
         }
     
-        Message out = exchange.getOut();
-        out.setBody(ret);
-        out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
-        
+        Message resultMessage = prepareResponseMessage(exchange, 
MongoDbOperation.save);
+        resultMessage.setBody(ret);
+        resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == 
null ? 0 : 1);
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -248,11 +250,11 @@ public class MongoDbProducer extends Def
             result = wc == null ? dbCol.insert((List<DBObject>) insert) : 
dbCol.insert((List<DBObject>) insert, wc);
         }
         
-        processWriteResult(result, exchange);
-        
+        Message resultMessage = prepareResponseMessage(exchange, 
MongoDbOperation.insert);
         // we always return the WriteResult, because whether the getLastError 
was called or not, the user will have the means to call it or 
         // obtain the cached CommandResult
-        exchange.getOut().setBody(result);
+        processAndTransferWriteResult(result, exchange);
+        resultMessage.setBody(result);
     }
 
     protected void doFindAll(Exchange exchange) throws Exception {
@@ -296,10 +298,10 @@ public class MongoDbProducer extends Def
                 ret.limit(limit.intValue());
             }
             
-            Message out = exchange.getOut();
-            out.setBody(ret.toArray());
-            out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret.count());
-            out.setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ret.size());
+            Message resultMessage = prepareResponseMessage(exchange, 
MongoDbOperation.findAll);
+            resultMessage.setBody(ret.toArray());
+            resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, 
ret.count());
+            resultMessage.setHeader(MongoDbConstants.RESULT_PAGE_SIZE, 
ret.size());
             
         } catch (Exception e) {
             // rethrow the exception
@@ -325,15 +327,16 @@ public class MongoDbProducer extends Def
             ret = dbCol.findOne(o, fieldFilter);
         }
         
-        Message out = exchange.getOut();
-        out.setBody(ret);
-        out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+        Message resultMessage = prepareResponseMessage(exchange, 
MongoDbOperation.findOneByQuery);
+        resultMessage.setBody(ret);
+        resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == 
null ? 0 : 1);
     }
 
     protected void doCount(Exchange exchange) throws Exception {
         DBCollection dbCol = calculateCollection(exchange);
         Long answer = Long.valueOf(dbCol.count());
-        exchange.getOut().setBody(answer);
+        Message resultMessage = prepareResponseMessage(exchange, 
MongoDbOperation.count);
+        resultMessage.setBody(answer);
     }
     
     // --------- Convenience methods -----------------------
@@ -368,7 +371,7 @@ public class MongoDbProducer extends Def
         return b == null ? false : b.booleanValue();      
     }
     
-    private void processWriteResult(WriteResult result, Exchange exchange) {
+    private void processAndTransferWriteResult(WriteResult result, Exchange 
exchange) {
         // if invokeGetLastError is set, or a WriteConcern is set which 
implicitly calls getLastError, then we have the chance to populate 
         // the MONGODB_LAST_ERROR header, as well as setting an exception on 
the Exchange if one occurred at the MongoDB server
         if (endpoint.isInvokeGetLastError() || (endpoint.getWriteConcern() != 
null ? endpoint.getWriteConcern().callGetLastError() : false)) {
@@ -378,6 +381,13 @@ public class MongoDbProducer extends Def
                 
exchange.setException(MongoDbComponent.wrapInCamelMongoDbException(cr.getException()));
             }
         }
+        
+        // determine where to set the WriteResult: as the OUT body or as an IN 
message header
+        if (endpoint.isWriteResultAsHeader()) {
+            exchange.getOut().setHeader(MongoDbConstants.WRITERESULT, result);
+        } else {
+            exchange.getOut().setBody(result);
+        }
     }
     
     private WriteConcern extractWriteConcern(Exchange exchange) throws 
CamelMongoDbException {
@@ -415,4 +425,17 @@ public class MongoDbProducer extends Def
         return dbObjectList;
     }
     
+    private Message prepareResponseMessage(Exchange exchange, MongoDbOperation 
operation) {
+        Message answer = exchange.getOut();
+        MessageHelper.copyHeaders(exchange.getIn(), answer, false);
+        if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
+            answer.setBody(exchange.getIn().getBody());
+        }
+        return answer;
+    }
+    
+    private boolean isWriteOperation(MongoDbOperation operation) {
+        return MongoDbComponent.WRITE_OPERATIONS.contains(operation);
+    }
+    
 }

Modified: 
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java
 Fri Oct 12 18:26:48 2012
@@ -33,6 +33,7 @@ public class MongoDbDynamicityTest exten
     public void testInsertDynamicityDisabled() {
         assertEquals(0, testCollection.count());
         mongo.getDB("otherDB").dropDatabase();
+        db.getCollection("otherCollection").drop();
         assertFalse("The otherDB database should not exist", 
mongo.getDatabaseNames().contains("otherDB"));
 
         String body = "{\"_id\": \"testInsertDynamicityDisabled\", \"a\" : 
\"1\"}";
@@ -60,6 +61,7 @@ public class MongoDbDynamicityTest exten
     public void testInsertDynamicityEnabledDBOnly() {
         assertEquals(0, testCollection.count());
         mongo.getDB("otherDB").dropDatabase();
+        db.getCollection("otherCollection").drop();
         assertFalse("The otherDB database should not exist", 
mongo.getDatabaseNames().contains("otherDB"));
 
         String body = "{\"_id\": \"testInsertDynamicityEnabledDBOnly\", \"a\" 
: \"1\"}";
@@ -85,6 +87,7 @@ public class MongoDbDynamicityTest exten
     public void testInsertDynamicityEnabledCollectionOnly() {
         assertEquals(0, testCollection.count());
         mongo.getDB("otherDB").dropDatabase();
+        db.getCollection("otherCollection").drop();
         assertFalse("The otherDB database should not exist", 
mongo.getDatabaseNames().contains("otherDB"));
 
         String body = "{\"_id\": 
\"testInsertDynamicityEnabledCollectionOnly\", \"a\" : \"1\"}";
@@ -109,6 +112,7 @@ public class MongoDbDynamicityTest exten
     public void testInsertDynamicityEnabledDBAndCollection() {
         assertEquals(0, testCollection.count());
         mongo.getDB("otherDB").dropDatabase();
+        db.getCollection("otherCollection").drop();
         assertFalse("The otherDB database should not exist", 
mongo.getDatabaseNames().contains("otherDB"));
 
         String body = "{\"_id\": 
\"testInsertDynamicityEnabledDBAndCollection\", \"a\" : \"1\"}";
@@ -135,9 +139,9 @@ public class MongoDbDynamicityTest exten
         return new RouteBuilder() {
             public void configure() {
                                 
-                
from("direct:noDynamicity").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
-                
from("direct:noDynamicityExplicit").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=false");
-                
from("direct:dynamicityEnabled").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true");
+                
from("direct:noDynamicity").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&writeConcern=SAFE");
+                
from("direct:noDynamicityExplicit").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=false&writeConcern=SAFE");
+                
from("direct:dynamicityEnabled").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true&writeConcern=SAFE");
 
             }
         };

Modified: 
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
--- 
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
 (original)
+++ 
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
 Fri Oct 12 18:26:48 2012
@@ -92,7 +92,6 @@ public class MongoDbOperationsTest exten
         
         record1 = testCollection.findOne("testSave1");
         assertEquals("Scientist field of 'testSave1' must equal 'Darwin' after 
save operation", "Darwin", record1.get("scientist"));
-        record1.put("scientist", "Darwin");
 
     }
     
@@ -108,6 +107,7 @@ public class MongoDbOperationsTest exten
             } else {
                 body = f.format("{\"_id\":\"testSave%d\", 
\"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
             }
+            f.close();
             template.requestBody("direct:insert", body);
         }
         assertEquals(100L, testCollection.count());
@@ -139,6 +139,7 @@ public class MongoDbOperationsTest exten
             } else {
                 body = f.format("{\"_id\":\"testSave%d\", 
\"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
             }
+            f.close();
             template.requestBody("direct:insert", body);
         }
         assertEquals(100L, testCollection.count());
@@ -172,6 +173,7 @@ public class MongoDbOperationsTest exten
             String body = null;
             Formatter f = new Formatter();
             body = f.format("{\"_id\":\"testSave%d\", 
\"scientist\":\"Einstein\"}", i).toString();
+            f.close();
             template.requestBody("direct:insert", body);
         }
         


Reply via email to