This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9504 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-it.git
commit 052a467de243a128094fb657bf0e0b913f51f42a Author: Christian Schneider <[email protected]> AuthorDate: Tue Jun 16 14:59:57 2020 +0200 SLING-9504 - Switch to protobuf --- pom.xml | 11 +++-- .../journal/it/DistributionTestSupport.java | 1 - .../journal/it/tests/AuthorDistributeTest.java | 39 ++++++++-------- .../journal/it/tests/AuthorRestartTest.java | 53 +++++++++++----------- .../journal/it/tests/ClearQueueItemTest.java | 28 ++++++------ .../journal/it/tests/PublisherReceiveTest.java | 29 ++++++------ .../distribution/journal/it/tests/ScaleUpTest.java | 10 ++-- 7 files changed, 83 insertions(+), 88 deletions(-) diff --git a/pom.xml b/pom.xml index d5af13c..c0a7161 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ <!-- P R O J E C T --> <!-- ======================================================================= --> <artifactId>org.apache.sling.distribution.journal.it</artifactId> - <version>0.1.1-SNAPSHOT</version> + <version>0.1.1-JSON-SNAPSHOT</version> <name>Apache Sling Distribution Journal - IT project</name> <description> @@ -157,22 +157,23 @@ <artifactId>org.apache.sling.commons.metrics</artifactId> <version>1.2.6</version> </dependency> + <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.distribution.journal.messages</artifactId> - <version>0.1.7-SNAPSHOT</version> + <version>0.1.7-JSON-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.distribution.journal</artifactId> - <version>0.1.15-SNAPSHOT</version> + <version>0.1.15-JSON-SNAPSHOT</version> </dependency> - <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.distribution.journal.kafka</artifactId> - <version>0.1.5-SNAPSHOT</version> + <version>0.1.5-JSON-SNAPSHOT</version> </dependency> + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> diff --git a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java index 17e471e..f26f54f 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/DistributionTestSupport.java @@ -125,7 +125,6 @@ public class DistributionTestSupport extends TestSupport { mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.commons.metrics").version(SlingOptions.versionResolver), mvn("org.apache.felix", "org.apache.felix.rootcause"), mvn("org.apache.felix", "org.apache.felix.systemready"), - mvn("com.google.protobuf", "protobuf-java"), kafka(), // The bundle built (org.apache.sling.distribution.journal) diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java index f57b35a..0ba9ea6 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorDistributeTest.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue; import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration; import java.io.Closeable; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -56,11 +57,11 @@ import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.it.DistributionTestSupport; import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka; -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberState; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.SubscriberConfig; +import org.apache.sling.distribution.journal.messages.SubscriberState; import org.junit.Test; import org.junit.runner.RunWith; import org.ops4j.pax.exam.Configuration; @@ -149,30 +150,30 @@ public class AuthorDistributeTest extends DistributionTestSupport { assertTrue(messageSem.tryAcquire(10, TimeUnit.SECONDS)); PackageMessage pkg = recordedPackage.get(); assertEquals(PackageMessage.ReqType.ADD, pkg.getReqType()); - String path = pkg.getPathsList().iterator().next(); + String path = pkg.getPaths().iterator().next(); assertEquals("/", path); } private void simulateDiscoveryMessage(long offset) { - MessageSender<DiscoveryMessage> discSender = clientProvider.createSender(); + MessageSender<DiscoveryMessage> discSender = clientProvider.createSender(TOPIC_DISCOVERY); DiscoveryMessage disc = createDiscoveryMessage(offset); - discSender.send(TOPIC_DISCOVERY, disc); + discSender.accept(disc); } private DiscoveryMessage createDiscoveryMessage(long offset) { - SubscriberState subState = SubscriberState.newBuilder() - .setOffset(offset) - .setPubAgentName(PUB1_AGENT) + SubscriberState subState = SubscriberState.builder() + .offset(offset) + .pubAgentName(PUB1_AGENT) .build(); - return DiscoveryMessage.newBuilder() - .setSubSlingId(SUB1_SLING_ID) - .setSubAgentName(SUB1_AGENT) - .setSubscriberConfiguration(SubscriberConfiguration - .newBuilder() - .setEditable(false) - .setMaxRetries(-1) + return DiscoveryMessage.builder() + .subSlingId(SUB1_SLING_ID) + .subAgentName(SUB1_AGENT) + .subscriberConfiguration(SubscriberConfig + .builder() + .editable(false) + .maxRetries(-1) .build()) - .addSubscriberState(subState) + .subscriberStates(Arrays.asList(subState)) .build(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java index 8983a4b..7b578b5 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/AuthorRestartTest.java @@ -46,11 +46,11 @@ import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.it.DistributionTestSupport; import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka; -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberState; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.messages.SubscriberConfig; +import org.apache.sling.distribution.journal.messages.SubscriberState; import org.junit.Test; import org.junit.runner.RunWith; import org.ops4j.pax.exam.Configuration; @@ -61,7 +61,6 @@ import org.ops4j.pax.exam.util.Filter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.protobuf.ByteString; /** * Starts an author instance, triggers a content distribution and checks that the package arrives @@ -111,14 +110,14 @@ public class AuthorRestartTest extends DistributionTestSupport { log.info("Sending message {}", c); } PackageMessage packageMessage = createPackageMessage(c); - clientProvider.createSender().send(TOPIC_PACKAGE, packageMessage); + clientProvider.createSender(TOPIC_PACKAGE).send(packageMessage); } try (Closeable packagePoller = createPoller()) { messageSem.tryAcquire(NUM_MESSAGES, 100, TimeUnit.SECONDS); } await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.emptySet())); DiscoveryMessage disc = createDiscoveryMessage(-1); - clientProvider.createSender().send(TOPIC_DISCOVERY, disc); + clientProvider.createSender(TOPIC_DISCOVERY).send(disc); await().until(() -> toSet(agent.getQueueNames()), equalTo(Collections.singleton(QUEUE_NAME))); log.info("Checking Items in queue"); @@ -136,19 +135,19 @@ public class AuthorRestartTest extends DistributionTestSupport { } private DiscoveryMessage createDiscoveryMessage(long offset) { - SubscriberState subState = SubscriberState.newBuilder() - .setOffset(offset) - .setPubAgentName(PUB1_AGENT) + SubscriberState subState = SubscriberState.builder() + .offset(offset) + .pubAgentName(PUB1_AGENT) .build(); - return DiscoveryMessage.newBuilder() - .setSubSlingId(SUB1_SLING_ID) - .setSubAgentName(SUB1_AGENT) - .setSubscriberConfiguration(SubscriberConfiguration - .newBuilder() - .setEditable(false) - .setMaxRetries(-1) + return DiscoveryMessage.builder() + .subSlingId(SUB1_SLING_ID) + .subAgentName(SUB1_AGENT) + .subscriberConfiguration(SubscriberConfig + .builder() + .editable(false) + .maxRetries(-1) .build()) - .addSubscriberState(subState) + .subscriberStates(Collections.singletonList(subState)) .build(); } @@ -158,14 +157,14 @@ public class AuthorRestartTest extends DistributionTestSupport { } private PackageMessage createPackageMessage(int num) throws IOException { - return PackageMessage.newBuilder() - .setPkgId("myid" + num) - .setPubSlingId("pub1sling") - .setPubAgentName(PUB1_AGENT) - .setPkgType("journal") - .setReqType(PackageMessage.ReqType.ADD) - .addAllPaths(Arrays.asList("/test")) - .setPkgBinary(ByteString.copyFrom(new byte[100])) + return PackageMessage.builder() + .pkgId("myid" + num) + .pubSlingId("pub1sling") + .pubAgentName(PUB1_AGENT) + .pkgType("journal") + .reqType(PackageMessage.ReqType.ADD) + .paths(Arrays.asList("/test")) + .pkgBinary(new byte[100]) .build(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java index a918caa..e6ae073 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/ClearQueueItemTest.java @@ -38,8 +38,7 @@ import org.apache.sling.distribution.journal.it.ext.AfterOsgi; import org.apache.sling.distribution.journal.it.ext.BeforeOsgi; import org.apache.sling.distribution.journal.it.ext.ExtPaxExam; import org.apache.sling.distribution.journal.it.kafka.KafkaLocal; - -import com.google.protobuf.ByteString; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.agent.spi.DistributionAgent; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.junit.Assert; @@ -56,7 +55,6 @@ import org.ops4j.pax.exam.spi.reactors.PerClass; import org.ops4j.pax.exam.util.Filter; import org.ops4j.pax.exam.util.PathUtils; -import static org.apache.sling.distribution.journal.messages.Messages.*; import static org.apache.commons.io.IOUtils.closeQuietly; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -247,9 +245,9 @@ public class ClearQueueItemTest extends DistributionTestSupport { private void sendInvalidPackages(int nb) throws Exception { - MessageSender<PackageMessage> sender = clientProvider.createSender(); + MessageSender<PackageMessage> sender = clientProvider.createSender(TOPIC_PACKAGE); for (int i = 0 ; i < nb ; i++) { - sender.send(TOPIC_PACKAGE, newInvalidPackage(PUB1_AGENT)); + sender.send(newInvalidPackage(PUB1_AGENT)); } } @@ -261,16 +259,16 @@ public class ClearQueueItemTest extends DistributionTestSupport { final List<String> deepPaths = Collections.emptyList(); final String pkgId = String.format("package-%s", UUID.randomUUID().toString()); - return PackageMessage.newBuilder() - .setPubSlingId("slingid") - .setPkgId(pkgId) - .setPubAgentName(agentId) - .setPkgBinary(ByteString.copyFrom(pkgBinary)) - .setPkgType("journal") - .addAllPaths(paths) - .setReqType(PackageMessage.ReqType.ADD) - .addAllDeepPaths(deepPaths) - .setPkgLength(pkgBinary.length) + return PackageMessage.builder() + .pubSlingId("slingid") + .pkgId(pkgId) + .pubAgentName(agentId) + .pkgBinary(pkgBinary) + .pkgType("journal") + .paths(paths) + .reqType(PackageMessage.ReqType.ADD) + .deepPaths(deepPaths) + .pkgLength(pkgBinary.length) .build(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java index a013ed5..e1930c9 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/PublisherReceiveTest.java @@ -46,8 +46,7 @@ import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.it.DistributionTestSupport; import org.apache.sling.distribution.journal.it.kafka.PaxExamWithKafka; -import org.apache.sling.distribution.journal.messages.Messages; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.distribution.packaging.DistributionPackageInfo; @@ -62,8 +61,6 @@ import org.ops4j.pax.exam.util.Filter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.protobuf.ByteString; - /** * Starts a publish instance and checks that it can receive and process a PackageMessage from the journal */ @@ -115,8 +112,8 @@ public class PublisherReceiveTest extends DistributionTestSupport { Arrays.asList(bundleContext.getBundles()).stream() .forEach(bundle -> log.info(bundle.getSymbolicName() + ":" + bundle.getVersion())); DistributionPackage pkg = createDistPackage(RESOURCE_PATH); - Messages.PackageMessage pkgMsg = toPackageMessage(pkg, "agent1"); - provider.createSender().send(TOPIC_PACKAGE, pkgMsg); + PackageMessage pkgMsg = toPackageMessage(pkg, "agent1"); + provider.createSender(TOPIC_PACKAGE).send(pkgMsg); await().until(() -> getResource(RESOURCE_PATH), notNullValue()); } @@ -159,16 +156,16 @@ public class PublisherReceiveTest extends DistributionTestSupport { final List<String> deepPaths = Arrays.asList(pkgInfo.get(PROPERTY_REQUEST_DEEP_PATHS, String[].class)); final String pkgId = pkg.getId(); - return PackageMessage.newBuilder() - .setPubSlingId("slingid") - .setPkgId(pkgId) - .setPubAgentName(agentId) - .setPkgBinary(ByteString.copyFrom(pkgBinary)) - .setPkgType(pkg.getType()) - .addAllPaths(paths) - .setReqType(PackageMessage.ReqType.ADD) - .addAllDeepPaths(deepPaths) - .setPkgLength(pkgBinary.length) + return PackageMessage.builder() + .pubSlingId("slingid") + .pkgId(pkgId) + .pubAgentName(agentId) + .pkgBinary(pkgBinary) + .pkgType(pkg.getType()) + .paths(paths) + .reqType(PackageMessage.ReqType.ADD) + .deepPaths(deepPaths) + .pkgLength(pkgBinary.length) .build(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java b/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java index 3dcb413..1513e0b 100644 --- a/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/it/tests/ScaleUpTest.java @@ -63,9 +63,9 @@ import org.apache.sling.distribution.journal.it.ClusterIdCleaner; import org.apache.sling.distribution.journal.it.DistributionTestSupport; import org.apache.sling.distribution.journal.it.FileUtil; import org.apache.sling.distribution.journal.it.kafka.KafkaLocal; -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage; -import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberState; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.SubscriberState; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -254,7 +254,7 @@ public class ScaleUpTest { } private void handleDiscovery(MessageInfo info, DiscoveryMessage message) { - List<SubscriberState> stateList = message.getSubscriberStateList(); + List<SubscriberState> stateList = message.getSubscriberStates(); String slingId = message.getSubSlingId(); OptionalLong minOffset = stateList.stream().mapToLong(state -> state.getOffset()).min(); LOG.info("DiscoveryMessage slingid {} received {} states {}", slingId, minOffset, stateList); @@ -266,7 +266,7 @@ public class ScaleUpTest { if (message.getReqType() == PackageMessage.ReqType.TEST) { return; } - LOG.info("PackageMessage received {}, paths {}", info.getOffset(), message.getPathsList()); + LOG.info("PackageMessage received {}, paths {}", info.getOffset(), message.getPaths()); this.lastPackageOffset = info.getOffset(); packageReceived.release(); }
