Author: raulk
Date: Fri Oct 12 18:26:48 2012
New Revision: 1397669
URL: http://svn.apache.org/viewvc?rev=1397669&view=rev
Log:
Fixed: CAMEL-5696 camel-mongodb write operations should put have configurable
option to put WriteResult in header + CAMEL-5697 camel-mongodb operations don't
transfer header values from IN to OUT. Backported to 2.10.x.
Added:
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbHeaderHandlingTest.java
- copied unchanged from r1397662,
camel/trunk/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbHeaderHandlingTest.java
Modified:
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
Modified:
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbComponent.java
Fri Oct 12 18:26:48 2012
@@ -16,7 +16,10 @@
*/
package org.apache.camel.component.mongodb;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import com.mongodb.Mongo;
@@ -28,6 +31,10 @@ import org.apache.camel.util.CamelContex
* Represents the component that manages {@link MongoDbEndpoint}.
*/
public class MongoDbComponent extends DefaultComponent {
+
+ public static final Set<MongoDbOperation> WRITE_OPERATIONS =
+ new
HashSet<MongoDbOperation>(Arrays.asList(MongoDbOperation.insert,
MongoDbOperation.save,
+ MongoDbOperation.update, MongoDbOperation.remove));
/**
* Should access a singleton of type Mongo
Modified:
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbConstants.java
Fri Oct 12 18:26:48 2012
@@ -35,7 +35,8 @@ public final class MongoDbConstants {
public static final String WRITECONCERN = "CamelMongoDbWriteConcern";
public static final String LIMIT = "CamelMongoDbLimit";
public static final String FROM_TAILABLE = "CamelMongoDbTailable";
-
+ public static final String WRITERESULT = "CamelMongoWriteResult";
+
private MongoDbConstants() { }
}
Modified:
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
Fri Oct 12 18:26:48 2012
@@ -55,6 +55,7 @@ public class MongoDbEndpoint extends Def
private WriteConcern writeConcernRef;
private ReadPreference readPreference;
private boolean dynamicity; // = false
+ private boolean writeResultAsHeader; // = false
// tailable cursor consumer by default
private MongoDbConsumerType consumerType;
private long cursorRegenerationDelay = 1000L;
@@ -500,4 +501,17 @@ public class MongoDbEndpoint extends Def
return persistentId;
}
+ public boolean isWriteResultAsHeader() {
+ return writeResultAsHeader;
+ }
+
+ /**
+ * In write operations, it determines whether instead of returning {@link
WriteResult} as the body of the OUT
+ * message, we transfer the IN message to the OUT and attach the
WriteResult as a header.
+ * @param writeResultAsHeader flag to indicate if this option is enabled
+ */
+ public void setWriteResultAsHeader(boolean writeResultAsHeader) {
+ this.writeResultAsHeader = writeResultAsHeader;
+ }
+
}
Modified:
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbOperation.java
Fri Oct 12 18:26:48 2012
@@ -22,8 +22,8 @@ public enum MongoDbOperation {
findById,
findOneByQuery,
findAll,
- // group, // future
- // mapReduce, // future
+ // group, // future operation
+ // mapReduce, // future operation
// create/update operations
insert,
Modified:
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbProducer.java
Fri Oct 12 18:26:48 2012
@@ -32,6 +32,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.TypeConverter;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.MessageHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,11 +115,11 @@ public class MongoDbProducer extends Def
break;
case getDbStats:
- doGetStats(exchange, 'D');
+ doGetStats(exchange, MongoDbOperation.getDbStats);
break;
case getColStats:
- doGetStats(exchange, 'C');
+ doGetStats(exchange, MongoDbOperation.getColStats);
break;
default:
@@ -128,17 +129,20 @@ public class MongoDbProducer extends Def
// ----------- MongoDB operations ----------------
- protected void doGetStats(Exchange exchange, char c) {
+ protected void doGetStats(Exchange exchange, MongoDbOperation operation)
throws Exception {
DBObject result = null;
- if (c == 'C') {
+ if (operation == MongoDbOperation.getColStats) {
result = calculateCollection(exchange).getStats();
- } else if (c == 'D') {
+ } else if (operation == MongoDbOperation.getDbStats) {
// if it's a DB, also take into account the dynamicity option and
the DB that is used
result = calculateCollection(exchange).getDB().getStats();
+ } else {
+ throw new CamelMongoDbException("Internal error: wrong operation
for getStats variant" + operation);
}
- exchange.getOut().setBody(result);
+ Message responseMessage = prepareResponseMessage(exchange, operation);
+ responseMessage.setBody(result);
}
protected void doRemove(Exchange exchange) throws Exception {
@@ -147,13 +151,12 @@ public class MongoDbProducer extends Def
WriteConcern wc = extractWriteConcern(exchange);
WriteResult result = wc == null ? dbCol.remove(removeObj) :
dbCol.remove(removeObj, wc);
- processWriteResult(result, exchange);
- Message out = exchange.getOut();
- // we always return the WriteResult, because whether the getLastError
was called or not, the user will have the means to call it or
- // obtain the cached CommandResult
- out.setBody(result);
- out.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
+ Message resultMessage = prepareResponseMessage(exchange,
MongoDbOperation.remove);
+ // we always return the WriteResult, because whether the getLastError
was called or not,
+ // the user will have the means to call it or obtain the cached
CommandResult
+ processAndTransferWriteResult(result, exchange);
+ resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED,
result.getN());
}
@SuppressWarnings("unchecked")
@@ -185,12 +188,11 @@ public class MongoDbProducer extends Def
: dbCol.update(updateCriteria, objNew,
calculateBooleanValue(upsert), calculateBooleanValue(multi), wc);
}
- processWriteResult(result, exchange);
- Message out = exchange.getOut();
+ Message resultMessage = prepareResponseMessage(exchange,
MongoDbOperation.update);
// we always return the WriteResult, because whether the getLastError
was called or not, the user will have the means to call it or
// obtain the cached CommandResult
- out.setBody(result);
- out.setHeader(MongoDbConstants.RECORDS_AFFECTED, result.getN());
+ processAndTransferWriteResult(result, exchange);
+ resultMessage.setHeader(MongoDbConstants.RECORDS_AFFECTED,
result.getN());
}
protected void doSave(Exchange exchange) throws Exception {
@@ -199,10 +201,11 @@ public class MongoDbProducer extends Def
WriteConcern wc = extractWriteConcern(exchange);
WriteResult result = wc == null ? dbCol.save(saveObj) :
dbCol.save(saveObj, wc);
- processWriteResult(result, exchange);
+
+ prepareResponseMessage(exchange, MongoDbOperation.save);
// we always return the WriteResult, because whether the getLastError
was called or not, the user will have the means to call it or
// obtain the cached CommandResult
- exchange.getOut().setBody(result);
+ processAndTransferWriteResult(result, exchange);
}
protected void doFindById(Exchange exchange) throws Exception {
@@ -217,10 +220,9 @@ public class MongoDbProducer extends Def
ret = dbCol.findOne(o, fieldFilter);
}
- Message out = exchange.getOut();
- out.setBody(ret);
- out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
-
+ Message resultMessage = prepareResponseMessage(exchange,
MongoDbOperation.save);
+ resultMessage.setBody(ret);
+ resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret ==
null ? 0 : 1);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -248,11 +250,11 @@ public class MongoDbProducer extends Def
result = wc == null ? dbCol.insert((List<DBObject>) insert) :
dbCol.insert((List<DBObject>) insert, wc);
}
- processWriteResult(result, exchange);
-
+ Message resultMessage = prepareResponseMessage(exchange,
MongoDbOperation.insert);
// we always return the WriteResult, because whether the getLastError
was called or not, the user will have the means to call it or
// obtain the cached CommandResult
- exchange.getOut().setBody(result);
+ processAndTransferWriteResult(result, exchange);
+ resultMessage.setBody(result);
}
protected void doFindAll(Exchange exchange) throws Exception {
@@ -296,10 +298,10 @@ public class MongoDbProducer extends Def
ret.limit(limit.intValue());
}
- Message out = exchange.getOut();
- out.setBody(ret.toArray());
- out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret.count());
- out.setHeader(MongoDbConstants.RESULT_PAGE_SIZE, ret.size());
+ Message resultMessage = prepareResponseMessage(exchange,
MongoDbOperation.findAll);
+ resultMessage.setBody(ret.toArray());
+ resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE,
ret.count());
+ resultMessage.setHeader(MongoDbConstants.RESULT_PAGE_SIZE,
ret.size());
} catch (Exception e) {
// rethrow the exception
@@ -325,15 +327,16 @@ public class MongoDbProducer extends Def
ret = dbCol.findOne(o, fieldFilter);
}
- Message out = exchange.getOut();
- out.setBody(ret);
- out.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret == null ? 0 : 1);
+ Message resultMessage = prepareResponseMessage(exchange,
MongoDbOperation.findOneByQuery);
+ resultMessage.setBody(ret);
+ resultMessage.setHeader(MongoDbConstants.RESULT_TOTAL_SIZE, ret ==
null ? 0 : 1);
}
protected void doCount(Exchange exchange) throws Exception {
DBCollection dbCol = calculateCollection(exchange);
Long answer = Long.valueOf(dbCol.count());
- exchange.getOut().setBody(answer);
+ Message resultMessage = prepareResponseMessage(exchange,
MongoDbOperation.count);
+ resultMessage.setBody(answer);
}
// --------- Convenience methods -----------------------
@@ -368,7 +371,7 @@ public class MongoDbProducer extends Def
return b == null ? false : b.booleanValue();
}
- private void processWriteResult(WriteResult result, Exchange exchange) {
+ private void processAndTransferWriteResult(WriteResult result, Exchange
exchange) {
// if invokeGetLastError is set, or a WriteConcern is set which
implicitly calls getLastError, then we have the chance to populate
// the MONGODB_LAST_ERROR header, as well as setting an exception on
the Exchange if one occurred at the MongoDB server
if (endpoint.isInvokeGetLastError() || (endpoint.getWriteConcern() !=
null ? endpoint.getWriteConcern().callGetLastError() : false)) {
@@ -378,6 +381,13 @@ public class MongoDbProducer extends Def
exchange.setException(MongoDbComponent.wrapInCamelMongoDbException(cr.getException()));
}
}
+
+ // determine where to set the WriteResult: as the OUT body or as an IN
message header
+ if (endpoint.isWriteResultAsHeader()) {
+ exchange.getOut().setHeader(MongoDbConstants.WRITERESULT, result);
+ } else {
+ exchange.getOut().setBody(result);
+ }
}
private WriteConcern extractWriteConcern(Exchange exchange) throws
CamelMongoDbException {
@@ -415,4 +425,17 @@ public class MongoDbProducer extends Def
return dbObjectList;
}
+ private Message prepareResponseMessage(Exchange exchange, MongoDbOperation
operation) {
+ Message answer = exchange.getOut();
+ MessageHelper.copyHeaders(exchange.getIn(), answer, false);
+ if (isWriteOperation(operation) && endpoint.isWriteResultAsHeader()) {
+ answer.setBody(exchange.getIn().getBody());
+ }
+ return answer;
+ }
+
+ private boolean isWriteOperation(MongoDbOperation operation) {
+ return MongoDbComponent.WRITE_OPERATIONS.contains(operation);
+ }
+
}
Modified:
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbDynamicityTest.java
Fri Oct 12 18:26:48 2012
@@ -33,6 +33,7 @@ public class MongoDbDynamicityTest exten
public void testInsertDynamicityDisabled() {
assertEquals(0, testCollection.count());
mongo.getDB("otherDB").dropDatabase();
+ db.getCollection("otherCollection").drop();
assertFalse("The otherDB database should not exist",
mongo.getDatabaseNames().contains("otherDB"));
String body = "{\"_id\": \"testInsertDynamicityDisabled\", \"a\" :
\"1\"}";
@@ -60,6 +61,7 @@ public class MongoDbDynamicityTest exten
public void testInsertDynamicityEnabledDBOnly() {
assertEquals(0, testCollection.count());
mongo.getDB("otherDB").dropDatabase();
+ db.getCollection("otherCollection").drop();
assertFalse("The otherDB database should not exist",
mongo.getDatabaseNames().contains("otherDB"));
String body = "{\"_id\": \"testInsertDynamicityEnabledDBOnly\", \"a\"
: \"1\"}";
@@ -85,6 +87,7 @@ public class MongoDbDynamicityTest exten
public void testInsertDynamicityEnabledCollectionOnly() {
assertEquals(0, testCollection.count());
mongo.getDB("otherDB").dropDatabase();
+ db.getCollection("otherCollection").drop();
assertFalse("The otherDB database should not exist",
mongo.getDatabaseNames().contains("otherDB"));
String body = "{\"_id\":
\"testInsertDynamicityEnabledCollectionOnly\", \"a\" : \"1\"}";
@@ -109,6 +112,7 @@ public class MongoDbDynamicityTest exten
public void testInsertDynamicityEnabledDBAndCollection() {
assertEquals(0, testCollection.count());
mongo.getDB("otherDB").dropDatabase();
+ db.getCollection("otherCollection").drop();
assertFalse("The otherDB database should not exist",
mongo.getDatabaseNames().contains("otherDB"));
String body = "{\"_id\":
\"testInsertDynamicityEnabledDBAndCollection\", \"a\" : \"1\"}";
@@ -135,9 +139,9 @@ public class MongoDbDynamicityTest exten
return new RouteBuilder() {
public void configure() {
-
from("direct:noDynamicity").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert");
-
from("direct:noDynamicityExplicit").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=false");
-
from("direct:dynamicityEnabled").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true");
+
from("direct:noDynamicity").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&writeConcern=SAFE");
+
from("direct:noDynamicityExplicit").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=false&writeConcern=SAFE");
+
from("direct:dynamicityEnabled").to("mongodb:myDb?database={{mongodb.testDb}}&collection={{mongodb.testCollection}}&operation=insert&dynamicity=true&writeConcern=SAFE");
}
};
Modified:
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java?rev=1397669&r1=1397668&r2=1397669&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-mongodb/src/test/java/org/apache/camel/component/mongodb/MongoDbOperationsTest.java
Fri Oct 12 18:26:48 2012
@@ -92,7 +92,6 @@ public class MongoDbOperationsTest exten
record1 = testCollection.findOne("testSave1");
assertEquals("Scientist field of 'testSave1' must equal 'Darwin' after
save operation", "Darwin", record1.get("scientist"));
- record1.put("scientist", "Darwin");
}
@@ -108,6 +107,7 @@ public class MongoDbOperationsTest exten
} else {
body = f.format("{\"_id\":\"testSave%d\",
\"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
}
+ f.close();
template.requestBody("direct:insert", body);
}
assertEquals(100L, testCollection.count());
@@ -139,6 +139,7 @@ public class MongoDbOperationsTest exten
} else {
body = f.format("{\"_id\":\"testSave%d\",
\"scientist\":\"Einstein\", \"extraField\": true}", i).toString();
}
+ f.close();
template.requestBody("direct:insert", body);
}
assertEquals(100L, testCollection.count());
@@ -172,6 +173,7 @@ public class MongoDbOperationsTest exten
String body = null;
Formatter f = new Formatter();
body = f.format("{\"_id\":\"testSave%d\",
\"scientist\":\"Einstein\"}", i).toString();
+ f.close();
template.requestBody("direct:insert", body);
}