Author: davsclaus
Date: Sat Sep 3 06:56:02 2011
New Revision: 1164809
URL: http://svn.apache.org/viewvc?rev=1164809&view=rev
Log:
CAMEL-4409: Adding test for ordering in camel-sql. Polished. Thanks to Mathieu
Lalonde for the patch.
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClassLoadingAwareObjectInputStream.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDataSourceRefTest.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerConcurrentTest.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java?rev=1164809&r1=1164808&r2=1164809&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlProducer.java
Sat Sep 3 06:56:02 2011
@@ -20,6 +20,7 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
@@ -39,11 +40,10 @@ public class SqlProducer extends Default
this.query = query;
}
- @SuppressWarnings("unchecked")
public void process(final Exchange exchange) throws Exception {
String queryHeader =
exchange.getIn().getHeader(SqlConstants.SQL_QUERY, String.class);
- jdbcTemplate.execute(queryHeader != null ? queryHeader : query, new
PreparedStatementCallback() {
- public Object doInPreparedStatement(PreparedStatement ps) throws
SQLException, DataAccessException {
+ jdbcTemplate.execute(queryHeader != null ? queryHeader : query, new
PreparedStatementCallback<Map<?, ?>>() {
+ public Map<?, ?> doInPreparedStatement(PreparedStatement ps)
throws SQLException, DataAccessException {
int argNumber = 1;
// number of parameters must match
@@ -63,8 +63,8 @@ public class SqlProducer extends Default
boolean isResultSet = ps.execute();
if (isResultSet) {
- RowMapperResultSetExtractor mapper = new
RowMapperResultSetExtractor(new ColumnMapRowMapper());
- List<?> result = (List<?>)
mapper.extractData(ps.getResultSet());
+ RowMapperResultSetExtractor<Map<String, Object>> mapper =
new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper());
+ List<Map<String, Object>> result =
mapper.extractData(ps.getResultSet());
exchange.getOut().setBody(result);
exchange.getIn().setHeader(SqlConstants.SQL_ROW_COUNT,
result.size());
// preserve headers
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClassLoadingAwareObjectInputStream.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClassLoadingAwareObjectInputStream.java?rev=1164809&r1=1164808&r2=1164809&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClassLoadingAwareObjectInputStream.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClassLoadingAwareObjectInputStream.java
Sat Sep 3 06:56:02 2011
@@ -33,7 +33,7 @@ public class ClassLoadingAwareObjectInpu
/**
* <p>Maps primitive type names to corresponding class objects.</p>
*/
- private static final HashMap<String, Class> PRIM_CLASSES = new
HashMap<String, Class>(8, 1.0F);
+ private static final HashMap<String, Class<?>> PRIM_CLASSES = new
HashMap<String, Class<?>>(8, 1.0F);
private CamelContext camelContext;
@@ -51,7 +51,7 @@ public class ClassLoadingAwareObjectInpu
@Override
protected Class<?> resolveProxyClass(String[] interfaces) throws
IOException, ClassNotFoundException {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
- Class[] cinterfaces = new Class[interfaces.length];
+ Class<?>[] cinterfaces = new Class[interfaces.length];
for (int i = 0; i < interfaces.length; i++) {
cinterfaces[i] =
camelContext.getClassResolver().resolveClass(interfaces[i], cl);
}
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java?rev=1164809&r1=1164808&r2=1164809&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
Sat Sep 3 06:56:02 2011
@@ -109,9 +109,8 @@ public class JdbcAggregationRepository e
jdbcTemplate = new JdbcTemplate(dataSource);
}
- @SuppressWarnings("unchecked")
public Exchange add(final CamelContext camelContext, final String
correlationId, final Exchange exchange) {
- return (Exchange) transactionTemplate.execute(new
TransactionCallback() {
+ return transactionTemplate.execute(new TransactionCallback<Exchange>()
{
public Exchange doInTransaction(TransactionStatus status) {
String sql;
@@ -163,9 +162,8 @@ public class JdbcAggregationRepository e
return result;
}
- @SuppressWarnings("unchecked")
private Exchange get(final String key, final String repositoryName, final
CamelContext camelContext) {
- return (Exchange) transactionTemplateReadOnly.execute(new
TransactionCallback() {
+ return transactionTemplateReadOnly.execute(new
TransactionCallback<Exchange>() {
public Exchange doInTransaction(TransactionStatus status) {
try {
final byte[] data = jdbcTemplate.queryForObject(
@@ -226,9 +224,8 @@ public class JdbcAggregationRepository e
});
}
- @SuppressWarnings("unchecked")
public Set<String> getKeys() {
- return (LinkedHashSet<String>) transactionTemplateReadOnly.execute(new
TransactionCallback() {
+ return transactionTemplateReadOnly.execute(new
TransactionCallback<LinkedHashSet<String>>() {
public LinkedHashSet<String> doInTransaction(TransactionStatus
status) {
List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM
" + getRepositoryName(),
new RowMapper<String>() {
@@ -243,9 +240,8 @@ public class JdbcAggregationRepository e
});
}
- @SuppressWarnings("unchecked")
public Set<String> scan(CamelContext camelContext) {
- return (LinkedHashSet<String>) transactionTemplateReadOnly.execute(new
TransactionCallback() {
+ return transactionTemplateReadOnly.execute(new
TransactionCallback<LinkedHashSet<String>>() {
public LinkedHashSet<String> doInTransaction(TransactionStatus
status) {
List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM
" + getRepositoryNameCompleted(),
new RowMapper<String>() {
Modified:
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java?rev=1164809&r1=1164808&r2=1164809&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
(original)
+++
camel/trunk/components/camel-sql/src/main/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepository.java
Sat Sep 3 06:56:02 2011
@@ -69,11 +69,10 @@ public class JdbcMessageIdRepository ext
}
@ManagedOperation(description = "Adds the key to the store")
- @SuppressWarnings({ "unchecked", "rawtypes" })
public boolean add(final String messageId) {
// Run this in single transaction.
- Boolean rc = (Boolean)transactionTemplate.execute(new
TransactionCallback() {
- public Object doInTransaction(TransactionStatus status) {
+ Boolean rc = transactionTemplate.execute(new
TransactionCallback<Boolean>() {
+ public Boolean doInTransaction(TransactionStatus status) {
int count = jdbcTemplate.queryForInt(QUERY_STRING,
processorName, messageId);
if (count == 0) {
jdbcTemplate.update(INSERT_STRING, processorName,
messageId, new Timestamp(System.currentTimeMillis()));
@@ -87,11 +86,10 @@ public class JdbcMessageIdRepository ext
}
@ManagedOperation(description = "Does the store contain the given key")
- @SuppressWarnings({ "unchecked", "rawtypes" })
public boolean contains(final String messageId) {
// Run this in single transaction.
- Boolean rc = (Boolean)transactionTemplate.execute(new
TransactionCallback() {
- public Object doInTransaction(TransactionStatus status) {
+ Boolean rc = transactionTemplate.execute(new
TransactionCallback<Boolean>() {
+ public Boolean doInTransaction(TransactionStatus status) {
int count = jdbcTemplate.queryForInt(QUERY_STRING,
processorName, messageId);
if (count == 0) {
return Boolean.FALSE;
@@ -104,10 +102,9 @@ public class JdbcMessageIdRepository ext
}
@ManagedOperation(description = "Remove the key from the store")
- @SuppressWarnings({ "unchecked", "rawtypes" })
public boolean remove(final String messageId) {
- Boolean rc = (Boolean)transactionTemplate.execute(new
TransactionCallback() {
- public Object doInTransaction(TransactionStatus status) {
+ Boolean rc = transactionTemplate.execute(new
TransactionCallback<Boolean>() {
+ public Boolean doInTransaction(TransactionStatus status) {
int updateCount = jdbcTemplate.update(DELETE_STRING,
processorName, messageId);
if (updateCount == 0) {
return Boolean.FALSE;
Modified:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDataSourceRefTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDataSourceRefTest.java?rev=1164809&r1=1164808&r2=1164809&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDataSourceRefTest.java
(original)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDataSourceRefTest.java
Sat Sep 3 06:56:02 2011
@@ -63,10 +63,10 @@ public class SqlDataSourceRefTest extend
mock.assertIsSatisfied();
// the result is a List
- List received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
+ List<?> received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
// and each row in the list is a Map
- Map row = assertIsInstanceOf(Map.class, received.get(0));
+ Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0));
// and we should be able the get the project from the map that should
be Linux
assertEquals("Linux", row.get("PROJECT"));
Modified:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerConcurrentTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerConcurrentTest.java?rev=1164809&r1=1164808&r2=1164809&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerConcurrentTest.java
(original)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlProducerConcurrentTest.java
Sat Sep 3 06:56:02 2011
@@ -58,10 +58,10 @@ public class SqlProducerConcurrentTest e
getMockEndpoint("mock:result").expectedMessageCount(files);
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
- Map<Integer, Future> responses = new ConcurrentHashMap<Integer,
Future>();
+ Map<Integer, Future<?>> responses = new ConcurrentHashMap<Integer,
Future<?>>();
for (int i = 0; i < files; i++) {
final int index = i;
- Future out = executor.submit(new Callable<Object>() {
+ Future<?> out = executor.submit(new Callable<Object>() {
public Object call() throws Exception {
int id = index % 3;
return template.requestBody("direct:simple", "" + id);
@@ -75,8 +75,8 @@ public class SqlProducerConcurrentTest e
assertEquals(files, responses.size());
for (int i = 0; i < files; i++) {
- List rows = (List) responses.get(i).get();
- Map columns = (Map) rows.get(0);
+ List<?> rows = assertIsInstanceOf(List.class,
responses.get(i).get());
+ Map<?, ?> columns = assertIsInstanceOf(Map.class, rows.get(0));
if (i % 3 == 0) {
assertEquals("Camel", columns.get("PROJECT"));
} else if (i % 3 == 1) {
Modified:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java?rev=1164809&r1=1164808&r2=1164809&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java
(original)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlRouteTest.java
Sat Sep 3 06:56:02 2011
@@ -17,8 +17,10 @@
package org.apache.camel.component.sql;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.sql.DataSource;
@@ -52,8 +54,8 @@ public class SqlRouteTest extends CamelT
mock.expectedMessageCount(1);
template.sendBody("direct:simple", "XXX");
mock.assertIsSatisfied();
- List received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
- Map row = assertIsInstanceOf(Map.class, received.get(0));
+ List<?> received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
+ Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0));
assertEquals("Linux", row.get("PROJECT"));
}
@@ -64,8 +66,8 @@ public class SqlRouteTest extends CamelT
template.sendBodyAndHeader("direct:simple", "Camel",
SqlConstants.SQL_QUERY, "select * from projects where project = ? order by id");
mock.assertIsSatisfied();
- List received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
- Map row = assertIsInstanceOf(Map.class, received.get(0));
+ List<?> received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
+ Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0));
assertEquals(1, row.get("id"));
assertEquals("ASF", row.get("license"));
mock.reset();
@@ -88,9 +90,15 @@ public class SqlRouteTest extends CamelT
body.add("Camel");
template.sendBody("direct:list", body);
mock.assertIsSatisfied();
- List received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
- Map row = assertIsInstanceOf(Map.class, received.get(0));
- assertEquals(1, row.get("ID"));
+ List<?> received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
+ Map<?, ?> firstRow = assertIsInstanceOf(Map.class, received.get(0));
+ assertEquals(1, firstRow.get("ID"));
+
+ // unlikely to have accidental ordering with 3 rows x 3 columns
+ for (Object obj : received) {
+ Map<?, ?> row = assertIsInstanceOf(Map.class, obj);
+ assertTrue("not preserving key ordering for a given row keys: " +
row.keySet(), isOrdered(row.keySet()));
+ }
}
@Test
@@ -124,11 +132,11 @@ public class SqlRouteTest extends CamelT
body.add("ASF");
template.sendBody("direct:simple", body);
mock.assertIsSatisfied();
- List received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
+ List<?> received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
assertEquals(2, received.size());
- Map row1 = assertIsInstanceOf(Map.class, received.get(0));
+ Map<?, ?> row1 = assertIsInstanceOf(Map.class, received.get(0));
assertEquals("Camel", row1.get("PROJECT"));
- Map row2 = assertIsInstanceOf(Map.class, received.get(1));
+ Map<?, ?> row2 = assertIsInstanceOf(Map.class, received.get(1));
assertEquals("AMQ", row2.get("PROJECT"));
}
@@ -140,9 +148,9 @@ public class SqlRouteTest extends CamelT
body.add("ASF");
template.sendBody("direct:simpleLimited", body);
mock.assertIsSatisfied();
- List received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
+ List<?> received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
assertEquals(1, received.size());
- Map row1 = assertIsInstanceOf(Map.class, received.get(0));
+ Map<?, ?> row1 = assertIsInstanceOf(Map.class, received.get(0));
assertEquals("Camel", row1.get("PROJECT"));
}
@@ -170,8 +178,8 @@ public class SqlRouteTest extends CamelT
mock.expectedMessageCount(1);
template.sendBody("direct:no-param", null);
mock.assertIsSatisfied();
- List received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
- Map row = assertIsInstanceOf(Map.class, received.get(0));
+ List<?> received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
+ Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0));
assertEquals("Camel", row.get("PROJECT"));
}
@@ -183,7 +191,7 @@ public class SqlRouteTest extends CamelT
mock.assertIsSatisfied();
Number received = assertIsInstanceOf(Number.class,
mock.getReceivedExchanges().get(0).getIn().getHeader(SqlConstants.SQL_UPDATE_COUNT));
assertEquals(1, received.intValue());
- Map projectNameInserted = jdbcTemplate.queryForMap("select project,
license from projects where id = 5");
+ Map<?, ?> projectNameInserted = jdbcTemplate.queryForMap("select
project, license from projects where id = 5");
assertEquals("#", projectNameInserted.get("PROJECT"));
assertEquals("XGPL", projectNameInserted.get("LICENSE"));
}
@@ -194,8 +202,8 @@ public class SqlRouteTest extends CamelT
mock.expectedMessageCount(1);
template.sendBody("direct:no-param", "Mock body");
mock.assertIsSatisfied();
- List received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
- Map row = assertIsInstanceOf(Map.class, received.get(0));
+ List<?> received = assertIsInstanceOf(List.class,
mock.getReceivedExchanges().get(0).getIn().getBody());
+ Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0));
assertEquals("Camel", row.get("PROJECT"));
}
@@ -249,5 +257,17 @@ public class SqlRouteTest extends CamelT
}
};
}
+
+ private boolean isOrdered(Set<?> keySet) {
+ assertTrue("isOrdered() requires the following keys: id, project,
license", keySet.contains("id"));
+ assertTrue("isOrdered() requires the following keys: id, project,
license", keySet.contains("project"));
+ assertTrue("isOrdered() requires the following keys: id, project,
license", keySet.contains("license"));
+
+ // the implementation uses a case insensitive Map
+ final Iterator<?> it = keySet.iterator();
+ return "id".equalsIgnoreCase(assertIsInstanceOf(String.class,
it.next()))
+ && "project".equalsIgnoreCase(assertIsInstanceOf(String.class,
it.next()))
+ && "license".equalsIgnoreCase(assertIsInstanceOf(String.class,
it.next()));
+ }
}
Modified:
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java?rev=1164809&r1=1164808&r2=1164809&view=diff
==============================================================================
---
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java
(original)
+++
camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/idempotent/jdbc/JdbcMessageIdRepositoryTest.java
Sat Sep 3 06:56:02 2011
@@ -73,14 +73,13 @@ public class JdbcMessageIdRepositoryTest
return new
ClassPathXmlApplicationContext("org/apache/camel/processor/idempotent/jdbc/spring.xml");
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
protected void setupRepository() {
TransactionTemplate transactionTemplate = new TransactionTemplate();
transactionTemplate.setTransactionManager(new
DataSourceTransactionManager(dataSource));
transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
- transactionTemplate.execute(new TransactionCallback() {
- public Object doInTransaction(TransactionStatus status) {
+ transactionTemplate.execute(new TransactionCallback<Boolean>() {
+ public Boolean doInTransaction(TransactionStatus status) {
try {
jdbcTemplate.execute("CREATE TABLE CAMEL_MESSAGEPROCESSED
(processorName VARCHAR(20), messageId VARCHAR(10), createdAt timestamp)");
} catch (DataAccessException e) {