This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9504 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-messages.git
commit 205530997adfc15cc9812ac204cc543b14bcc1df Author: Christian Schneider <[email protected]> AuthorDate: Mon Jun 15 18:46:31 2020 +0200 SLING-9504 - Migrate to json --- pom.xml | 83 ++++++--------------- .../sling/distribution/journal/FullMessage.java | 4 +- .../sling/distribution/journal/HandlerAdapter.java | 36 +++------ .../sling/distribution/journal/MessageSender.java | 17 +++-- .../distribution/journal/MessagingProvider.java | 28 ++----- ...geDistributedMessage.java => ClearCommand.java} | 29 +++++--- ...stributedMessage.java => DiscoveryMessage.java} | 29 +++++--- .../messages/PackageDistributedMessage.java | 9 +++ ...DistributedMessage.java => PackageMessage.java} | 41 +++++++--- .../journal/messages/PackageStatusMessage.java | 71 ++++++++++++++++++ .../PingMessage.java} | 17 +++-- ...stributedMessage.java => SubscriberConfig.java} | 25 ++++--- ...istributedMessage.java => SubscriberState.java} | 27 ++++--- .../sling/distribution/journal/messages/Types.java | 51 ------------- .../journal/messages/package-info.java | 2 +- .../sling/distribution/journal/package-info.java | 2 +- src/main/protobuf/messages.proto | 85 --------------------- .../distribution/journal/messages/SerTest.java | 87 ---------------------- 18 files changed, 250 insertions(+), 393 deletions(-) diff --git a/pom.xml b/pom.xml index 9a983d8..5b11aff 100644 --- a/pom.xml +++ b/pom.xml @@ -1,27 +1,21 @@ <?xml version="1.0"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <!-- ======================================================================= --> - <!-- P A R E N T P R O J E C T --> + <!-- P A R E N T P R O J E C T --> <!-- ======================================================================= --> <parent> <groupId>org.apache.sling</groupId> @@ -31,10 +25,10 @@ </parent> <!-- ======================================================================= --> - <!-- P R O J E C T --> + <!-- P R O J E C T --> <!-- ======================================================================= --> <artifactId>org.apache.sling.distribution.journal.messages</artifactId> - <version>0.1.6</version> + <version>0.1.7-JSON-SNAPSHOT</version> <name>Apache Sling Journal based Content Distribution - Messages bundle</name> <description>Implementation of the messages to support Apache Sling Content Distribution on top of an append-only persisted log</description> @@ -47,50 +41,19 @@ <connection>scm:git:https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-messages.git</connection> <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-messages.git</developerConnection> <url>https://gitbox.apache.org/repos/asf?p=sling-org-apache-sling-distribution-journal-messages.git</url> - <tag>org.apache.sling.distribution.journal.messages-0.1.6</tag> - </scm> + <tag>org.apache.sling.distribution.journal.messages-0.1.2</tag> + </scm> <!-- ======================================================================= --> - <!-- B U I L D --> - <!-- ======================================================================= --> - <build> - <plugins> - <plugin> - <groupId>com.github.os72</groupId> - <artifactId>protoc-jar-maven-plugin</artifactId> - <version>3.5.1.1</version> - <executions> - <execution> - <phase>generate-sources</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <protocVersion>2.5.0</protocVersion> - <outputTargets> - <outputTarget> - <type>java</type> - <addSources>true</addSources> - <outputDirectory>target/generated-sources/protoc-jar</outputDirectory> - </outputTarget> - </outputTargets> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - - <!-- ======================================================================= --> - <!-- D E P E N D E N C I E S --> + <!-- D E P E N D E N C I E S --> <!-- ======================================================================= --> <dependencies> <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>3.7.0</version> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>1.18.12</version> + <scope>provided</scope> </dependency> - <dependency> <groupId>org.osgi</groupId> <artifactId>osgi.cmpn</artifactId> diff --git a/src/main/java/org/apache/sling/distribution/journal/FullMessage.java b/src/main/java/org/apache/sling/distribution/journal/FullMessage.java index 444c209..b3efd88 100644 --- a/src/main/java/org/apache/sling/distribution/journal/FullMessage.java +++ b/src/main/java/org/apache/sling/distribution/journal/FullMessage.java @@ -18,9 +18,7 @@ */ package org.apache.sling.distribution.journal; -import com.google.protobuf.GeneratedMessage; - -public class FullMessage<T extends GeneratedMessage> { +public class FullMessage<T> { private MessageInfo info; private T message; diff --git a/src/main/java/org/apache/sling/distribution/journal/HandlerAdapter.java b/src/main/java/org/apache/sling/distribution/journal/HandlerAdapter.java index 1b59c78..fc9a2a0 100644 --- a/src/main/java/org/apache/sling/distribution/journal/HandlerAdapter.java +++ b/src/main/java/org/apache/sling/distribution/journal/HandlerAdapter.java @@ -18,40 +18,17 @@ */ package org.apache.sling.distribution.journal; -import java.lang.reflect.Method; - -import com.google.protobuf.ByteString; -import com.google.protobuf.ExtensionRegistryLite; - public class HandlerAdapter<T> { - private final MessageHandler<T> handler; - private final Method method; - private final ExtensionRegistryLite registry; private final Class<T> type; + private final MessageHandler<T> handler; - public HandlerAdapter(Class<T> type, MessageHandler<T> handler) { - this.type = type; - this.handler = handler; - try { - method = type.getMethod("parseFrom", ByteString.class, ExtensionRegistryLite.class); - } catch (Exception e) { - throw new IllegalArgumentException(e); - } - registry = ExtensionRegistryLite.newInstance(); - } - public static <T> HandlerAdapter<T> create(Class<T> type, MessageHandler<T> handler) { return new HandlerAdapter<>(type, handler); } - @SuppressWarnings("unchecked") - private T parseFrom(ByteString payloadBytes) throws Exception { - return (T) method.invoke(null, payloadBytes, registry); - } - - public void handle(MessageInfo info, ByteString payloadBytes) throws Exception { - T payload = parseFrom(payloadBytes); - handler.handle(info, payload); + private HandlerAdapter(Class<T> type, MessageHandler<T> handler) { + this.type = type; + this.handler = handler; } public Class<?> getType() { @@ -61,4 +38,9 @@ public class HandlerAdapter<T> { public MessageHandler<T> getHandler() { return this.handler; } + + @Override + public String toString() { + return "Message handler for type=" + type.getName() + ", handler=" + handler.getClass().getName(); + } } diff --git a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java index 0a7f668..6b149c9 100644 --- a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java +++ b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java @@ -19,13 +19,20 @@ package org.apache.sling.distribution.journal; import java.util.Map; +import java.util.function.Consumer; -import com.google.protobuf.GeneratedMessage; +public interface MessageSender<T> extends Consumer<T> { -public interface MessageSender<T extends GeneratedMessage> { - - void send(String topic, T payload) throws MessagingException; + /** + * Make sure every MessageSender also acts as Consumer + */ + @Override + default void accept(T payload) { + send(payload); + } + + void send(T payload) throws MessagingException; - void send(String topic, T payload, Map<String, String> properties) throws MessagingException; + void send(T payload, Map<String, String> properties) throws MessagingException; } diff --git a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java index c4d49f9..cf8cbbe 100644 --- a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java +++ b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java @@ -19,34 +19,20 @@ package org.apache.sling.distribution.journal; import java.io.Closeable; -import java.util.function.Consumer; - -import com.google.protobuf.GeneratedMessage; public interface MessagingProvider { - <T extends GeneratedMessage> MessageSender<T> createSender(); - - /** - * Create a sender for a fixed topic - * - * @param <T> - * @param topic - * @return - */ - default <T extends GeneratedMessage> Consumer<T> createSender(String topic) { - MessageSender<GeneratedMessage> sender = createSender(); - return payload -> sender.send(topic, payload); - } + <T> MessageSender<T> createSender(String topic); - <T> Closeable createPoller(String topicName, Reset reset, HandlerAdapter<?>... adapters); + default Closeable createPoller( + String topicName, + Reset reset, + HandlerAdapter<?> ... adapters) { + return createPoller(topicName, reset, null, adapters); + } Closeable createPoller(String topicName, Reset reset, String assign, HandlerAdapter<?>... adapters); - <T> JsonMessageSender<T> createJsonSender(); - - <T> Closeable createJsonPoller(String topicName, Reset reset, MessageHandler<T> handler, Class<T> type); - void assertTopic(String topic) throws MessagingException; long retrieveOffset(String topicName, Reset reset); diff --git a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java b/src/main/java/org/apache/sling/distribution/journal/messages/ClearCommand.java similarity index 70% copy from src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java copy to src/main/java/org/apache/sling/distribution/journal/messages/ClearCommand.java index b26dea6..42b6d51 100644 --- a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java +++ b/src/main/java/org/apache/sling/distribution/journal/messages/ClearCommand.java @@ -18,15 +18,24 @@ */ package org.apache.sling.distribution.journal.messages; -public class PackageDistributedMessage { +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; - public String pubAgentName; - - public String packageId; - - public String[] deepPaths; - - public String[] paths; - - public long offset; +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ClearCommand { + + // Subscriber agent Sling identifier + String subSlingId; + + // Subscriber agent name + String subAgentName; + + String pubAgentName; + + long offset; } diff --git a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java b/src/main/java/org/apache/sling/distribution/journal/messages/DiscoveryMessage.java similarity index 66% copy from src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java copy to src/main/java/org/apache/sling/distribution/journal/messages/DiscoveryMessage.java index b26dea6..1cce06b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java +++ b/src/main/java/org/apache/sling/distribution/journal/messages/DiscoveryMessage.java @@ -18,15 +18,26 @@ */ package org.apache.sling.distribution.journal.messages; -public class PackageDistributedMessage { +import java.util.List; - public String pubAgentName; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; - public String packageId; - - public String[] deepPaths; - - public String[] paths; - - public long offset; +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DiscoveryMessage { + + // Subscriber agent Sling identifier + String subSlingId; + + // Subscriber agent name + String subAgentName; + + SubscriberConfig subscriberConfiguration; + + List<SubscriberState> subscriberStates; } diff --git a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java b/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java index b26dea6..463a9a4 100644 --- a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java +++ b/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java @@ -18,6 +18,15 @@ */ package org.apache.sling.distribution.journal.messages; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class PackageDistributedMessage { public String pubAgentName; diff --git a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java b/src/main/java/org/apache/sling/distribution/journal/messages/PackageMessage.java similarity index 56% copy from src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java copy to src/main/java/org/apache/sling/distribution/journal/messages/PackageMessage.java index b26dea6..0802541 100644 --- a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java +++ b/src/main/java/org/apache/sling/distribution/journal/messages/PackageMessage.java @@ -18,15 +18,38 @@ */ package org.apache.sling.distribution.journal.messages; -public class PackageDistributedMessage { +import java.util.ArrayList; +import java.util.List; - public String pubAgentName; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; - public String packageId; - - public String[] deepPaths; - - public String[] paths; - - public long offset; +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PackageMessage { + String pubSlingId; + ReqType reqType; + String pkgId; + String pkgType; + long pkgLength; + byte[] pkgBinary; + String pkgBinaryRef; + String pubAgentName; + String userId; + + @Builder.Default + List<String> paths = new ArrayList<>(); + + @Builder.Default + List<String> deepPaths = new ArrayList<>(); + + public enum ReqType { + ADD, + DELETE, + TEST; + } } diff --git a/src/main/java/org/apache/sling/distribution/journal/messages/PackageStatusMessage.java b/src/main/java/org/apache/sling/distribution/journal/messages/PackageStatusMessage.java new file mode 100644 index 0000000..b8e5b15 --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/messages/PackageStatusMessage.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.messages; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PackageStatusMessage { + + String subSlingId; + String subAgentName; + String pubAgentName; + long offset; + PackageStatusMessage.Status status; + + public enum Status { + /** + * The package has been removed automatically after failing every retry attempts + */ + REMOVED_FAILED(0), + /** + * The package has been removed explicitly + */ + REMOVED(1), + /** + * The package has been imported + */ + IMPORTED(2); + + private int number; + + Status(int number) { + this.number = number; + } + + public int getNumber() { + return number; + } + + public static Status fromNumber(int number) { + switch (number) { + case 0: return REMOVED_FAILED; + case 1: return REMOVED; + case 2: return IMPORTED; + } + throw new IllegalStateException("Unknown number " + number); + } + } +} diff --git a/src/main/java/org/apache/sling/distribution/journal/JsonMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/messages/PingMessage.java similarity index 75% rename from src/main/java/org/apache/sling/distribution/journal/JsonMessageSender.java rename to src/main/java/org/apache/sling/distribution/journal/messages/PingMessage.java index 82a4813..0fab986 100644 --- a/src/main/java/org/apache/sling/distribution/journal/JsonMessageSender.java +++ b/src/main/java/org/apache/sling/distribution/journal/messages/PingMessage.java @@ -16,10 +16,17 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal; +package org.apache.sling.distribution.journal.messages; -public interface JsonMessageSender<T> { +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; - void send(String topic, T payload); - -} \ No newline at end of file +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class PingMessage { + String message; +} diff --git a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java b/src/main/java/org/apache/sling/distribution/journal/messages/SubscriberConfig.java similarity index 64% copy from src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java copy to src/main/java/org/apache/sling/distribution/journal/messages/SubscriberConfig.java index b26dea6..10f6fa5 100644 --- a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java +++ b/src/main/java/org/apache/sling/distribution/journal/messages/SubscriberConfig.java @@ -18,15 +18,22 @@ */ package org.apache.sling.distribution.journal.messages; -public class PackageDistributedMessage { +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; - public String pubAgentName; +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SubscriberConfig { + boolean editable; - public String packageId; - - public String[] deepPaths; - - public String[] paths; - - public long offset; + /** + * The max number of retry attempts to process this package. A value smaller + * than zero indicates an infinite number of retry attempts. A value greater or + * equal to zero indicates a specific number of retry attempts. + */ + int maxRetries; } diff --git a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java b/src/main/java/org/apache/sling/distribution/journal/messages/SubscriberState.java similarity index 67% copy from src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java copy to src/main/java/org/apache/sling/distribution/journal/messages/SubscriberState.java index b26dea6..1513f21 100644 --- a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java +++ b/src/main/java/org/apache/sling/distribution/journal/messages/SubscriberState.java @@ -18,15 +18,22 @@ */ package org.apache.sling.distribution.journal.messages; -public class PackageDistributedMessage { +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; - public String pubAgentName; - - public String packageId; - - public String[] deepPaths; - - public String[] paths; - - public long offset; +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SubscriberState { + // Publisher agent name + String pubAgentName; + + // Last processed offset on the Subscriber agent + long offset; + + // Nb of retries for the current offset on the Subscriber agent + int retries; } diff --git a/src/main/java/org/apache/sling/distribution/journal/messages/Types.java b/src/main/java/org/apache/sling/distribution/journal/messages/Types.java deleted file mode 100644 index f49a103..0000000 --- a/src/main/java/org/apache/sling/distribution/journal/messages/Types.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sling.distribution.journal.messages; - -import java.util.HashMap; -import java.util.Map; - -public class Types { - private static Map<Class<?>, Integer> types = new HashMap<>(); - static { - types.put(Messages.DiscoveryMessage.class, 1); - types.put(Messages.PackageMessage.class, 2); - types.put(Messages.PackageStatusMessage.class, 3); - types.put(Messages.CommandMessage.class, 4); - types.put(Messages.PingMessage.class, 5); - } - - public static Class<?> getType(int type, int version) { - for (Class<?> clazz : types.keySet()) { - Integer cType = types.get(clazz); - if (cType == type && version == 1) { - return clazz; - } - } - throw new IllegalArgumentException(String.format("Unknown type %d and version %d", type, version)); - } - - public static Integer getType(Class<?> clazz) { - return types.get(clazz); - } - - public static Integer getVersion(Class<?> clazz) { - return 1; - } -} diff --git a/src/main/java/org/apache/sling/distribution/journal/messages/package-info.java b/src/main/java/org/apache/sling/distribution/journal/messages/package-info.java index f69a41d..093cdc1 100644 --- a/src/main/java/org/apache/sling/distribution/journal/messages/package-info.java +++ b/src/main/java/org/apache/sling/distribution/journal/messages/package-info.java @@ -16,6 +16,6 @@ * specific language governing permissions and limitations * under the License. */ [email protected]("0.1.0") [email protected]("1.0.0") package org.apache.sling.distribution.journal.messages; diff --git a/src/main/java/org/apache/sling/distribution/journal/package-info.java b/src/main/java/org/apache/sling/distribution/journal/package-info.java index 444f520..43650db 100644 --- a/src/main/java/org/apache/sling/distribution/journal/package-info.java +++ b/src/main/java/org/apache/sling/distribution/journal/package-info.java @@ -16,6 +16,6 @@ * specific language governing permissions and limitations * under the License. */ [email protected]("0.2.0") [email protected]("1.0.0") package org.apache.sling.distribution.journal; diff --git a/src/main/protobuf/messages.proto b/src/main/protobuf/messages.proto deleted file mode 100644 index 5ce6e6b..0000000 --- a/src/main/protobuf/messages.proto +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sling.distribution.journal.messages; - -message PackageMessage { - enum ReqType { - ADD = 0; - DELETE = 1; - TEST = 2; - } - required string pubSlingId = 1; // Publisher agent Sling identifier - //required int32 maxRetries = 9; - required ReqType reqType = 4; // The request type - required string pkgId = 3; // Content package identifier - required string pkgType = 11; // The package type - optional int64 pkgLength = 5; - optional bytes pkgBinary = 6; // The content package binary. Either the pkgBinary or the pkgBinaryRef field MUST be set. - optional string pkgBinaryRef = 10; // The reference to the content binary in the shared blob store. Either the pkgBinary or the pkgBinaryRef field MUST be set. - optional string pubAgentName = 2; // Publisher agent name - optional string userId = 12; - repeated string paths = 7; - repeated string deepPaths = 8; -} - -message PackageStatusMessage { - enum Status { - REMOVED_FAILED = 0; // The package has been removed automatically after failing every retry attempts - REMOVED = 1; // The package has been removed explicitly - IMPORTED = 2; // The package has been imported - } - required string subSlingId = 1; // Subscriber agent Sling identifier - required string subAgentName = 2; // Subscriber agent name - required string pubAgentName = 3; // Publisher agent name - required int64 offset = 4; // The offset of the package for which the status is given - required Status status = 5; // The package status -} - - -message SubscriberState { - required string pubAgentName = 1; // Publisher agent name - required int64 offset = 2; // Last processed offset on the Subscriber agent - optional int32 retries = 3; // Nb of retries for the current offset on the Subscriber agent -} - -message SubscriberConfiguration { - required bool editable = 1; - required int32 maxRetries = 2; // The max number of retry attempts to process this package. A value smaller than zero indicates an infinite number of retry attempts. A value greater or equal to zero indicates a specific number of retry attempts. -} - -message DiscoveryMessage { - required string subSlingId = 1; // Subscriber agent Sling identifier - required string subAgentName = 2; // Subscriber agent name - repeated SubscriberState subscriberState = 3; - required SubscriberConfiguration subscriberConfiguration = 4; -} - -message ClearCommand { - required int64 offset = 1; -} - -message CommandMessage { - required string subSlingId = 1; // Subscriber agent Sling identifier - required string subAgentName = 2; // Subscriber agent name - optional ClearCommand clearCommand = 3; - optional string pubAgentName = 4; -} - -message PingMessage { -} diff --git a/src/test/java/org/apache/sling/distribution/journal/messages/SerTest.java b/src/test/java/org/apache/sling/distribution/journal/messages/SerTest.java deleted file mode 100644 index b093c34..0000000 --- a/src/test/java/org/apache/sling/distribution/journal/messages/SerTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sling.distribution.journal.messages; - -import static org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import java.lang.reflect.Method; - -import org.junit.Test; - -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage; -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessageOrBuilder; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberState; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberStateOrBuilder; -import com.google.protobuf.ByteString; -import com.google.protobuf.ExtensionRegistryLite; -import com.google.protobuf.GeneratedMessage; -import com.google.protobuf.InvalidProtocolBufferException; - -public class SerTest { - - @Test - public void test() throws InvalidProtocolBufferException { - DiscoveryMessage discoveryMsg = createMessage(); - byte[] messageBytes = discoveryMsg.toByteArray(); - - ExtensionRegistryLite registry = ExtensionRegistryLite.newInstance(); - DiscoveryMessageOrBuilder messageIn = DiscoveryMessage.parseFrom(messageBytes, registry); - checkMessage(messageIn); - - } - - @Test - public void testFromClass() throws Exception { - DiscoveryMessage discoveryMsg = createMessage(); - ByteString byteString = discoveryMsg.toByteString(); - - ExtensionRegistryLite registry = ExtensionRegistryLite.newInstance(); - Class<? extends GeneratedMessage> type = DiscoveryMessage.class; - Method method = type.getMethod("parseFrom", ByteString.class, ExtensionRegistryLite.class); - DiscoveryMessage messageIn = (DiscoveryMessage) method.invoke(null, byteString, registry); - checkMessage(messageIn); - } - - private DiscoveryMessage createMessage() { - DiscoveryMessage discoveryMsg = DiscoveryMessage.newBuilder() - .setSubSlingId("subscribersling1") - .setSubAgentName("subscriber1") - .setSubscriberConfiguration(SubscriberConfiguration - .newBuilder() - .setEditable(false) - .setMaxRetries(-1) - .build()) - .addSubscriberState(SubscriberState - .newBuilder() - .setPubAgentName("publisher1") - .setOffset(10).build()) - .build(); - return discoveryMsg; - } - - private void checkMessage(DiscoveryMessageOrBuilder messageIn) { - assertThat(messageIn.getSubSlingId(), equalTo("subscribersling1")); - assertThat(messageIn.getSubAgentName(), equalTo("subscriber1")); - SubscriberStateOrBuilder offset = messageIn.getSubscriberStateList().iterator().next(); - assertThat(offset.getPubAgentName(), equalTo("publisher1")); - assertThat(offset.getOffset(), equalTo(10l)); - } -}
