This is an automated email from the ASF dual-hosted git repository.
aldettinger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 14fcebe Salesforce: Expand Consumer integration tests : add tests for
Streaming API
14fcebe is described below
commit 14fcebea86410ed83479344902f3bc8ff1cc2c66
Author: Zineb Bendhiba <[email protected]>
AuthorDate: Mon Aug 9 15:40:20 2021 +0200
Salesforce: Expand Consumer integration tests : add tests for Streaming API
---
.../salesforce/deployment/SalesforceProcessor.java | 6 ++++
.../component/salesforce/SalesforceResource.java | 40 ++++++++++++++++++++++
.../component/salesforce/SalesforceRoutes.java | 16 +++++++++
.../salesforce/SalesforceIntegrationTest.java | 39 +++++++++++++++++++--
4 files changed, 99 insertions(+), 2 deletions(-)
diff --git
a/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java
b/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java
index 5017515..3aacea1 100644
---
a/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java
+++
b/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java
@@ -23,6 +23,7 @@ import
io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import org.apache.camel.component.salesforce.api.dto.AbstractDTOBase;
+import org.apache.camel.component.salesforce.internal.dto.PushTopic;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
@@ -58,8 +59,13 @@ class SalesforceProcessor {
.stream()
.map(classInfo -> classInfo.name().toString())
.filter(className ->
className.startsWith("org.apache.camel.component.salesforce.internal.dto"))
+ // it is registred below with fields accessible
+ .filter(className -> className != PushTopic.class.getName())
.toArray(String[]::new);
reflectiveClass.produce(new ReflectiveClassBuildItem(true, false,
internalDtoClasses));
+
+ // enabling the search for private fields : related to issue
https://issues.apache.org/jira/browse/CAMEL-16860
+ reflectiveClass.produce(new ReflectiveClassBuildItem(true, true,
PushTopic.class));
}
}
diff --git
a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java
b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java
index 0520bdd..6f4d184 100644
---
a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java
+++
b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java
@@ -52,6 +52,8 @@ import
org.apache.camel.component.salesforce.api.dto.bulk.ContentType;
import org.apache.camel.component.salesforce.api.dto.bulk.JobInfo;
import org.apache.camel.component.salesforce.api.dto.bulk.OperationEnum;
import org.apache.camel.component.salesforce.api.utils.QueryHelper;
+import org.apache.camel.component.salesforce.internal.dto.PushTopic;
+import
org.apache.camel.component.salesforce.internal.dto.QueryRecordsPushTopic;
import org.apache.camel.quarkus.component.salesforce.generated.Account;
import
org.apache.camel.quarkus.component.salesforce.generated.QueryRecordsAccount;
import
org.apache.camel.quarkus.component.salesforce.model.GlobalObjectsAndHeaders;
@@ -249,4 +251,42 @@ public class SalesforceResource {
return template.to("salesforce:limits").request(Limits.class);
}
+ @Path("streaming")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public String getSubscribedObjects() {
+ Account account = consumerTemplate.receiveBody("seda:CamelTestTopic",
10000, Account.class);
+ return account.getName();
+ }
+
+ @Path("streaming/raw")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public String getRawSubscribedObjects() {
+ return consumerTemplate.receiveBody("seda:RawPayloadCamelTestTopic",
10000, String.class);
+ }
+
+ @Path("/topic/{id}")
+ @DELETE
+ public Response deleteTopic(@PathParam("id") String topicId) {
+ PushTopic topic = new PushTopic();
+ topic.setId(topicId);
+
+ template.to("salesforce:deleteSObject")
+ .withBody(topic)
+ .send();
+
+ return Response.noContent().build();
+ }
+
+ @Path("/topic")
+ @GET
+ public String getTopicId() {
+ QueryRecordsPushTopic queryRecordsPushTopic = template
+ .to("salesforce:query?sObjectQuery=SELECT Id FROM PushTopic
WHERE Name = 'CamelTestTopic'&"
+ + "sObjectClass=" +
QueryRecordsPushTopic.class.getName())
+ .request(QueryRecordsPushTopic.class);
+
+ return queryRecordsPushTopic.getRecords().get(0).getId();
+ }
}
diff --git
a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java
b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java
index 387e701..66658b8 100644
---
a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java
+++
b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java
@@ -24,6 +24,7 @@ import javax.inject.Named;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.salesforce.AuthenticationType;
import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.quarkus.component.salesforce.generated.Account;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@@ -69,8 +70,23 @@ public class SalesforceRoutes extends RouteBuilder {
Optional<String> wireMockUrl =
ConfigProvider.getConfig().getOptionalValue("wiremock.url", String.class);
// Wiremock used only with Templates - this Route is used only with
Salesforce credentials
if (!wireMockUrl.isPresent()) {
+
+ // Change Data Capture
from("salesforce:/data/AccountChangeEvent?replayId=-1").routeId("cdc").autoStartup(false)
.to("seda:events");
+
+ // Streaming API : topic consumer - getting Account object
+ from("salesforce:CamelTestTopic?notifyForFields=ALL&"
+ +
"notifyForOperationCreate=true¬ifyForOperationDelete=true¬ifyForOperationUpdate=true&"
+ + "sObjectClass=" + Account.class.getName() +
"&updateTopic=true&sObjectQuery=SELECT Id, Name FROM Account")
+ .to("seda:CamelTestTopic");
+
+ // Streaming API : topic consumer with RAW Payload - getting json
as String
+
from("salesforce:CamelTestTopic?rawPayload=true¬ifyForFields=ALL&"
+ +
"notifyForOperationCreate=true¬ifyForOperationDelete=true¬ifyForOperationUpdate=true&"
+ + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM
Account")
+ .to("seda:RawPayloadCamelTestTopic");
+
}
}
}
diff --git
a/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java
b/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java
index 91c68d8..5dffd4e 100644
---
a/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java
+++
b/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java
@@ -23,7 +23,9 @@ import io.restassured.RestAssured;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
@EnabledIfEnvironmentVariable(named = "SALESFORCE_USERNAME", matches = ".+")
@EnabledIfEnvironmentVariable(named = "SALESFORCE_PASSWORD", matches = ".+")
@@ -33,8 +35,9 @@ import static org.hamcrest.Matchers.is;
public class SalesforceIntegrationTest {
@Test
- public void testChangeDataCaptureEvents() {
+ public void testCDCAndStreamingEvents() {
String accountId = null;
+ String topicId = null;
try {
// Start the Salesforce CDC consumer
RestAssured.post("/salesforce/cdc/start")
@@ -42,7 +45,7 @@ public class SalesforceIntegrationTest {
.statusCode(200);
// Create an account
- String accountName = "Camel Quarkus Account Test: " +
UUID.randomUUID().toString();
+ String accountName = "Camel Quarkus Account Test: " +
UUID.randomUUID();
accountId = RestAssured.given()
.body(accountName)
.post("/salesforce/account")
@@ -58,6 +61,30 @@ public class SalesforceIntegrationTest {
.then()
.statusCode(200)
.body("Name", is(accountName));
+
+ // Verify we can stream the Account as Object
+ RestAssured.given()
+ .get("/salesforce/streaming")
+ .then()
+ .statusCode(200)
+ .body(equalTo(accountName));
+
+ // Verify we can stream the Account as Raw payload
+ RestAssured.given()
+ .get("/salesforce/streaming/raw")
+ .then()
+ .statusCode(200)
+ .body("Name", equalTo(accountName));
+
+ // Get the topic ID
+ topicId = RestAssured.given()
+ .get("/salesforce/topic")
+ .then()
+ .statusCode(200)
+ .extract()
+ .body()
+ .asString();
+ assertNotNull(topicId);
} finally {
// Shut down the CDC consumer
RestAssured.post("/salesforce/cdc/stop")
@@ -70,6 +97,14 @@ public class SalesforceIntegrationTest {
.then()
.statusCode(204);
}
+
+ // delete the topic
+ if (topicId != null) {
+ RestAssured.delete("/salesforce/topic/" + topicId)
+ .then()
+ .statusCode(204);
+ }
}
}
+
}