This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-it.git
The following commit(s) were added to refs/heads/master by this push:
new 45a9fbd SLING-9472 - Check queue state via http to author
45a9fbd is described below
commit 45a9fbda740f3025cbc094f486ce86f2f9dfd22a
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Jun 30 11:41:21 2020 +0200
SLING-9472 - Check queue state via http to author
---
.../sling/distribution/journal/it/Client.java | 132 +++++++++++++++++++++
.../journal/it/DistributionTestBase.java | 94 +--------------
.../journal/it/DistributionTestSupport.java | 12 +-
.../journal/it/tests/PublisherReceiveTest.java | 13 +-
.../it/tests/StagedDistributionFailureTest.java | 35 +++---
.../journal/it/tests/StagedDistributionTest.java | 23 ++--
src/test/resources/logback.xml | 2 +
7 files changed, 175 insertions(+), 136 deletions(-)
diff --git a/src/test/java/org/apache/sling/distribution/journal/it/Client.java
b/src/test/java/org/apache/sling/distribution/journal/it/Client.java
new file mode 100644
index 0000000..f85009a
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/it/Client.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.it;
+
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+public class Client {
+ private static final String ADMIN_USER = "admin";
+ private static final String ADMIN_PASSWORD = "admin";
+ private static final int TIMEOUT_SECONDS = 60;
+
+ protected static Logger LOG = LoggerFactory.getLogger(Client.class);
+
+ public static void waitNumQueues(int num) {
+ await("Waiting for " + num + " queues to be present")
+ .pollInterval(1, TimeUnit.SECONDS)
+ .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .until(Client::numQueues, equalTo(num));
+ }
+
+ public static void waitSumQueueSizes(int num) {
+ await("Wait for all queues to have size sum " + num)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .timeout(TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .ignoreExceptions()
+ .until(Client::sumQueueSizes, equalTo(num));
+ }
+
+ public static int numQueues() {
+ return getQueues(DistributionTestBase.PUB1_AGENT).size();
+ }
+
+ public static int sumQueueSizes() {
+ Map<String, Integer> queues =
Client.getQueues(DistributionTestBase.PUB1_AGENT);
+
+ return queues.values().stream().mapToInt(Integer::valueOf).sum();
+ }
+
+ public static Map<String, Integer> getQueues(String agentName) {
+ String uri =
String.format("http://localhost:%s/libs/sling/distribution/services/agents/%s/queues.2.json",
+ 8181,
+ agentName);
+ JsonObject root = getJson(uri).getAsJsonObject();
+ Map<String, Integer> queues = new HashMap<>();
+ root.entrySet().stream()
+ .filter(entry -> !entry.getKey().equals("items"))
+ .filter(entry -> !entry.getKey().equals("sling:resourceType"))
+ .forEach(entry -> {
+ queues.put(entry.getKey(), getItemsCount(entry));
+ });
+ LOG.info("Queue sizes {}", queues);
+ return queues;
+ }
+
+ private static int getItemsCount(Entry<String, JsonElement> entry) {
+ return entry.getValue().getAsJsonObject().get("itemsCount").getAsInt();
+ }
+
+ public static List<String> getSubNodes(String uri) {
+ LOG.info("Trying to get queue from {}", uri);
+ JsonElement root = getJson(uri);
+ List<String> result = new ArrayList<>();
+ JsonArray items = root.getAsJsonObject().get("items").getAsJsonArray();
+ items.forEach(item -> result.add(item.getAsString()));
+ return result;
+ }
+
+ private static JsonElement getJson(String uri) {
+ try (CloseableHttpClient client = createHttpClient()) {
+ HttpGet httpGet = new HttpGet(uri);
+ CloseableHttpResponse response = client.execute(httpGet);
+ InputStreamReader reader = new
InputStreamReader(response.getEntity().getContent(), Charset.forName("utf-8"));
+ return new JsonParser().parse(reader);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot get json for uri " + uri, e);
+ }
+ }
+
+ protected static CloseableHttpClient createHttpClient() {
+ return HttpClientBuilder.create()
+ .setDefaultCredentialsProvider(credentialsProvider()).build();
+ }
+
+ private static CredentialsProvider credentialsProvider() {
+ CredentialsProvider provider = new BasicCredentialsProvider();
+ UsernamePasswordCredentials credentials = new
UsernamePasswordCredentials(ADMIN_USER, ADMIN_PASSWORD);
+ provider.setCredentials(AuthScope.ANY, credentials);
+ return provider;
+ }
+}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
index 01464e2..209e98d 100644
---
a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
+++
b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestBase.java
@@ -19,24 +19,18 @@
package org.apache.sling.distribution.journal.it;
import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
import javax.inject.Inject;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -53,12 +47,8 @@ import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.Distributor;
import org.apache.sling.distribution.SimpleDistributionRequest;
-import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.it.kafka.KafkaLocal;
-import org.awaitility.Duration;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.ops4j.pax.exam.Configuration;
@@ -78,11 +68,7 @@ public class DistributionTestBase extends
DistributionTestSupport {
private static KafkaLocal kafka;
private static final String RESOURCE_TYPE = "sling:Folder";
- private static final String PUB1_AGENT = "agent1";
-
- @Inject
- @Filter(value = "(name=agent1)", timeout = 40000L)
- DistributionAgent agent;
+ public static final String PUB1_AGENT = "agent1";
@Inject
@Filter
@@ -127,14 +113,14 @@ public class DistributionTestBase extends
DistributionTestSupport {
}
- public static TestContainer startPublishInstance(int httpPort, String
agentName, boolean editable, String stageAgentName) {
+ public static TestContainer startPublishInstance(int httpPort, String
agentName, boolean editable, boolean stagingPrecondition) {
ExamSystem testSystem;
try {
String workdir = String.format("%s/target/paxexam/%s",
PathUtils.getBaseDir(), "publish-" + httpPort + "-" +
UUID.randomUUID().toString());
Option[] config = CoreOptions.options( //
new
DistributionTestSupport().withHttpPort(httpPort).baseConfiguration(workdir), //
defaultOsgiConfigs(), //
- publishOsgiConfigs(agentName, editable, stageAgentName), //
+ publishOsgiConfigs(agentName, editable,
stagingPrecondition), //
CoreOptions.workingDirectory(workdir)
);
@@ -144,6 +130,7 @@ public class DistributionTestBase extends
DistributionTestSupport {
}
TestContainer container = PaxExamRuntime.createContainer(testSystem);
container.start();
+ LOG.info("Container with port {} started.", httpPort);
return container;
}
@@ -162,16 +149,6 @@ public class DistributionTestBase extends
DistributionTestSupport {
return response.isSuccessful();
}
- private List<String> queueNames() {
- List<String> queueNames = new ArrayList<>();
- agent.getQueueNames().forEach(queueNames::add);
- return queueNames;
- }
-
- protected int getQueueItems(String queueName) {
- return agent.getQueue(queueName).getStatus().getItemsCount();
- }
-
@SuppressWarnings({ "deprecation" })
private ResourceResolver createResolver() {
try {
@@ -215,67 +192,4 @@ public class DistributionTestBase extends
DistributionTestSupport {
.until(() -> tryGetPath(httpPort, path), equalTo(200));
}
- public Iterable<String> waitSubQueues(String... queues) {
- Matcher<String>[] matchers = containsNames(queues);
- Iterable<String> queueNames = await().atMost(Duration.ONE_MINUTE)
- .pollInterval(Duration.FIVE_SECONDS)
- .until(this::queueNames, containsInAnyOrder(matchers));
- LOG.info("Subscriber Queues: " + String.join(", ", queueNames));
- return queueNames;
- }
-
- @SuppressWarnings("unchecked")
- private Matcher<String>[] containsNames(String... queues) {
- return Stream.of(queues)
- .map(name -> Matchers.containsString(name))
- .toArray(Matcher[]::new);
- }
-
- protected void waitEmptySubQueues() {
- List<String> names = queueNames();
- for (String name : names) {
- await("Queue " + name + "empty")
- .atMost(60, TimeUnit.SECONDS)
- .until(() -> getQueueItems(name), equalTo(0));
- }
- }
-
-
- static protected void waitQueueItems(int httpPort, String agentName, int
count) {
- await("Waiting for number of items in queue.")
- .atMost(Duration.ONE_MINUTE)
- .pollInterval(Duration.FIVE_SECONDS)
- .until(() -> tryGetQueueItems(httpPort, agentName),
equalTo(count));
- LOG.info("Items count {} for agent {}", count, agentName + "-" +
httpPort);
-
- }
-
- static private int tryGetQueueItems(int httpPort, String agentName) {
- String url =
String.format("http://localhost:%s/libs/sling/distribution/services/agents/%s/queues.2.json",
httpPort, agentName);
- HttpGet httpGet = new HttpGet(url);
- Header authHeader = null;
- try (CloseableHttpClient client = HttpClients.createDefault()) {
- authHeader = new BasicScheme().authenticate(new
UsernamePasswordCredentials("admin", "admin"), httpGet, null);
- httpGet.addHeader(authHeader);
-
-
- CloseableHttpResponse response = client.execute(httpGet);
- String text = IOUtils.toString(response.getEntity().getContent(),
Charset.defaultCharset());
- if (text == null) {
- return -1;
- }
-
- String itemsCount = StringUtils.substringBetween(text,
"itemsCount\":", ",");
- if (itemsCount == null) {
- return -1;
- }
-
- return Integer.parseInt(itemsCount.trim());
- } catch (Throwable e) {
- LOG.error("cannot get items count {}", url, e);
- }
- return -1;
- }
-
-
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
index f26f54f..545ddcc 100644
---
a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
+++
b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java
@@ -118,6 +118,7 @@ public class DistributionTestSupport extends TestSupport {
paxTinybundles(),
SlingOptions.logback(),
mavenBundle().groupId("org.slf4j").artifactId("log4j-over-slf4j").version(SlingOptions.versionResolver),
+ mvn("com.google.code.gson", "gson"),
// The base sling Quickstart
slingQuickstart(baseDirectory),
@@ -298,13 +299,10 @@ public class DistributionTestSupport extends TestSupport {
* OSGI configuration targeted to the publish instances only
*/
public static Option publishOsgiConfigs(String agentName) {
- return publishOsgiConfigs(agentName, true, null);
-
+ return publishOsgiConfigs(agentName, true, false);
}
- protected static Option publishOsgiConfigs(String agentName, boolean
editable, String stage) {
-
-
+ protected static Option publishOsgiConfigs(String agentName, boolean
editable, boolean stagingPrecondition) {
return composite(
factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
.put("kind", "agent")
@@ -313,9 +311,9 @@ public class DistributionTestSupport extends TestSupport {
factoryConfiguration("org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
.put("name", agentName)
- .put("agentNames", new String[]{"agent1"})
+ .put("agentNames", new String[]{ "agent1"})
.put("packageBuilder.target", "(name=journal)")
- .put("precondition.target", stage != null ?
"(name=staging)" : "(name=default)")
+ .put("precondition.target", stagingPrecondition ?
"(name=staging)" : "(name=default)")
.put("editable", editable)
.put("announceDelay", "500")
.asOption());
diff --git
a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
index e1930c9..9d4e2ed 100644
---
a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java
@@ -41,7 +41,6 @@ import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.SimpleDistributionRequest;
-import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.it.DistributionTestSupport;
@@ -81,15 +80,7 @@ public class PublisherReceiveTest extends
DistributionTestSupport {
private DistributionPackageBuilder packageBuilder;
@Inject
- @Filter("(name=subscriber-agent1)")
- DistributionAgent subscriber;
-
- @Inject
MessagingProvider provider;
- /*
- @Inject
- ServiceUserMapper serviceUserMapper;
- */
@Configuration
public Option[] configuration() {
@@ -109,8 +100,8 @@ public class PublisherReceiveTest extends
DistributionTestSupport {
@Test
public void testReceive() throws Exception {
- Arrays.asList(bundleContext.getBundles()).stream()
- .forEach(bundle -> log.info(bundle.getSymbolicName() + ":" +
bundle.getVersion()));
+ Arrays.asList(bundleContext.getBundles()).stream()
+ .forEach(bundle -> log.info(bundle.getSymbolicName() + ":" +
bundle.getVersion()));
DistributionPackage pkg = createDistPackage(RESOURCE_PATH);
PackageMessage pkgMsg = toPackageMessage(pkg, "agent1");
provider.createSender(TOPIC_PACKAGE).send(pkgMsg);
diff --git
a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
index 600eb0c..28756df 100644
---
a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionFailureTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.sling.distribution.journal.it.tests;
+import java.io.IOException;
+
+import org.apache.sling.distribution.journal.it.Client;
import org.apache.sling.distribution.journal.it.DistributionTestBase;
import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
@@ -29,38 +32,38 @@ import org.ops4j.pax.exam.TestContainer;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerClass;
-import java.io.IOException;
-
@RunWith(ExtPaxExam.class)
@ExamReactorStrategy(PerClass.class)
public class StagedDistributionFailureTest extends DistributionTestBase {
- private static final String SUB1_AGENT = "subscriber";
- private static final String SUB2_AGENT = "subscriber";
-
+ private static final String SUB1_AGENT = PUB1_AGENT + "Subscriber";
+ private static final String SUB2_AGENT = PUB1_AGENT + "Subscriber";
private static volatile TestContainer publish;
private static volatile TestContainer golden_publish;
-
private static final String TEST_PATH = "/content/mytest";
@BeforeOsgi
public static void beforeOsgi() throws Exception {
beforeOsgiBase();
- publish = startPublishInstance(8182, SUB1_AGENT, false, SUB2_AGENT);
+ publish = startPublishInstance(8182, SUB1_AGENT, false, true);
new
Thread(StagedDistributionFailureTest::delayedStartGoldenSubscriber).start();
}
-
+
/**
* Wait for at least one item in publish queue before starting golden
publish
*/
private static void delayedStartGoldenSubscriber() {
- waitQueueItems(8182, SUB1_AGENT, 1);
- LOG.info("Starting golden publish");
- golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
+ try {
+ Client.waitSumQueueSizes(1);
+ LOG.info("Starting golden publish");
+ golden_publish = startPublishInstance(8183, SUB2_AGENT, true,
false);
+ } catch (Exception e) {
+ LOG.error("Start of golden subscriber failed with: " +
e.getMessage(), e);
+ }
}
@AfterOsgi
@@ -78,8 +81,7 @@ public class StagedDistributionFailureTest extends
DistributionTestBase {
@Before
public void before() {
createPath(TEST_PATH);
-
- waitSubQueues(SUB1_AGENT);
+ Client.waitNumQueues(1);
}
/**
@@ -90,13 +92,14 @@ public class StagedDistributionFailureTest extends
DistributionTestBase {
*/
@Test
public void testDistribute() {
-
distribute(TEST_PATH);
- waitSubQueues(SUB1_AGENT, SUB2_AGENT);
- waitEmptySubQueues();
+ Client.waitNumQueues(2);
+ Client.waitSumQueueSizes(0);
waitPath(8182, TEST_PATH);
waitPath(8183, TEST_PATH);
}
+
+
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
index bbd103f..f47f027 100644
---
a/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/it/tests/StagedDistributionTest.java
@@ -18,28 +18,28 @@
*/
package org.apache.sling.distribution.journal.it.tests;
+import java.io.IOException;
+
+import org.apache.sling.distribution.journal.it.Client;
import org.apache.sling.distribution.journal.it.DistributionTestBase;
import org.apache.sling.distribution.journal.it.ext.AfterOsgi;
import org.apache.sling.distribution.journal.it.ext.BeforeOsgi;
import org.apache.sling.distribution.journal.it.ext.ExtPaxExam;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.TestContainer;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerClass;
-import java.io.IOException;
-
-@Ignore(value = "Switched off as this test does not seem to work on jenkins.
Locally it works.")
+//@Ignore(value = "Switched off as this test does not seem to work on jenkins.
Locally it works.")
@RunWith(ExtPaxExam.class)
@ExamReactorStrategy(PerClass.class)
public class StagedDistributionTest extends DistributionTestBase {
- private static final String SUB1_AGENT = "subscriber-regular";
- private static final String SUB2_AGENT = "subscriber-golden";
-
+ private static final String SUB1_AGENT = PUB1_AGENT + "Subscriber";
+ private static final String SUB2_AGENT = PUB1_AGENT + "Subscriber";
+
private static TestContainer golden_publish;
private static TestContainer publish;
@@ -50,8 +50,8 @@ public class StagedDistributionTest extends
DistributionTestBase {
@BeforeOsgi
public static void beforeOsgi() throws Exception {
beforeOsgiBase();
- publish = startPublishInstance(8182, SUB1_AGENT, false, SUB2_AGENT);
- golden_publish = startPublishInstance(8183, SUB2_AGENT, true, null);
+ publish = startPublishInstance(8182, SUB1_AGENT, false, true);
+ golden_publish = startPublishInstance(8183, SUB2_AGENT, true, false);
}
@@ -69,8 +69,7 @@ public class StagedDistributionTest extends
DistributionTestBase {
@Before
public void before() {
createPath(TEST_PATH);
-
- waitSubQueues(SUB1_AGENT, SUB2_AGENT);
+ Client.waitNumQueues(2);
}
@Test
@@ -78,7 +77,7 @@ public class StagedDistributionTest extends
DistributionTestBase {
distribute(TEST_PATH);
- waitEmptySubQueues();
+ Client.waitSumQueueSizes(0);
waitPath(8182, TEST_PATH);
waitPath(8183, TEST_PATH);
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
index 5f21dd4..57f012c 100644
--- a/src/test/resources/logback.xml
+++ b/src/test/resources/logback.xml
@@ -28,7 +28,9 @@
<appender-ref ref="console"/>
</root>
+
<logger name="kafka" level="WARN"/>
+ <logger
name="org.apache.sling.distribution.journal.kafka.KafkaJsonMessageSender"
level="WARN"/>
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.apache.kafka.clients" level="WARN"/>
<logger name="org.apache.sling.jcr.repoinit.impl" level="WARN"/>