This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-messages.git
The following commit(s) were added to refs/heads/master by this push:
new 018ffcb SLING-9504 - Migrate to json (#2)
018ffcb is described below
commit 018ffcb57acd3a8bac485d0cc109876aef7e8c77
Author: Christian Schneider <[email protected]>
AuthorDate: Thu Jun 18 17:16:08 2020 +0200
SLING-9504 - Migrate to json (#2)
---
pom.xml | 56 ++++----------
.../sling/distribution/journal/FullMessage.java | 4 +-
.../sling/distribution/journal/HandlerAdapter.java | 36 +++------
.../sling/distribution/journal/MessageSender.java | 17 +++--
.../distribution/journal/MessagingProvider.java | 28 ++-----
...geDistributedMessage.java => ClearCommand.java} | 29 +++++---
...stributedMessage.java => DiscoveryMessage.java} | 29 +++++---
.../messages/PackageDistributedMessage.java | 9 +++
...DistributedMessage.java => PackageMessage.java} | 41 +++++++---
.../journal/messages/PackageStatusMessage.java | 71 ++++++++++++++++++
.../PingMessage.java} | 17 +++--
...stributedMessage.java => SubscriberConfig.java} | 25 ++++---
...istributedMessage.java => SubscriberState.java} | 27 ++++---
.../sling/distribution/journal/messages/Types.java | 51 -------------
.../journal/messages/package-info.java | 2 +-
.../sling/distribution/journal/package-info.java | 2 +-
src/main/protobuf/messages.proto | 85 ---------------------
.../journal/messages/PackageMessageTest.java | 57 ++++++++++++++
.../distribution/journal/messages/SerTest.java | 87 ----------------------
src/test/resources/serialized.json | 1 +
20 files changed, 300 insertions(+), 374 deletions(-)
diff --git a/pom.xml b/pom.xml
index 3c40f7f..85d7595 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,7 +21,7 @@
<modelVersion>4.0.0</modelVersion>
<!--
======================================================================= -->
- <!-- P A R E N T P R O J E C T
-->
+ <!-- P A R E N T P R O J E C T -->
<!--
======================================================================= -->
<parent>
<groupId>org.apache.sling</groupId>
@@ -31,10 +31,10 @@
</parent>
<!--
======================================================================= -->
- <!-- P R O J E C T
-->
+ <!-- P R O J E C T -->
<!--
======================================================================= -->
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
- <version>0.1.9-SNAPSHOT</version>
+ <version>0.2.0-SNAPSHOT</version>
<name>Apache Sling Journal based Content Distribution - Messages
bundle</name>
<description>Implementation of the messages to support Apache Sling
Content Distribution on top of an append-only persisted log</description>
@@ -47,50 +47,18 @@
<connection>scm:git:https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-messages.git</connection>
<developerConnection>scm:git:https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-messages.git</developerConnection>
<url>https://gitbox.apache.org/repos/asf?p=sling-org-apache-sling-distribution-journal-messages.git</url>
- <tag>org.apache.sling.distribution.journal.messages-0.1.2</tag>
- </scm>
+ </scm>
<!--
======================================================================= -->
- <!-- B U I L D
-->
- <!--
======================================================================= -->
- <build>
- <plugins>
- <plugin>
- <groupId>com.github.os72</groupId>
- <artifactId>protoc-jar-maven-plugin</artifactId>
- <version>3.5.1.1</version>
- <executions>
- <execution>
- <phase>generate-sources</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <protocVersion>2.5.0</protocVersion>
- <outputTargets>
- <outputTarget>
- <type>java</type>
- <addSources>true</addSources>
-
<outputDirectory>target/generated-sources/protoc-jar</outputDirectory>
- </outputTarget>
- </outputTargets>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <!--
======================================================================= -->
- <!-- D E P E N D E N C I E S
-->
+ <!-- D E P E N D E N C I E S -->
<!--
======================================================================= -->
<dependencies>
<dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>3.7.0</version>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.12</version>
+ <scope>provided</scope>
</dependency>
-
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.cmpn</artifactId>
@@ -125,5 +93,11 @@
<version>1.2.3</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.9.7</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/src/main/java/org/apache/sling/distribution/journal/FullMessage.java
b/src/main/java/org/apache/sling/distribution/journal/FullMessage.java
index 444c209..b3efd88 100644
--- a/src/main/java/org/apache/sling/distribution/journal/FullMessage.java
+++ b/src/main/java/org/apache/sling/distribution/journal/FullMessage.java
@@ -18,9 +18,7 @@
*/
package org.apache.sling.distribution.journal;
-import com.google.protobuf.GeneratedMessage;
-
-public class FullMessage<T extends GeneratedMessage> {
+public class FullMessage<T> {
private MessageInfo info;
private T message;
diff --git
a/src/main/java/org/apache/sling/distribution/journal/HandlerAdapter.java
b/src/main/java/org/apache/sling/distribution/journal/HandlerAdapter.java
index 1b59c78..fc9a2a0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/HandlerAdapter.java
+++ b/src/main/java/org/apache/sling/distribution/journal/HandlerAdapter.java
@@ -18,40 +18,17 @@
*/
package org.apache.sling.distribution.journal;
-import java.lang.reflect.Method;
-
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistryLite;
-
public class HandlerAdapter<T> {
- private final MessageHandler<T> handler;
- private final Method method;
- private final ExtensionRegistryLite registry;
private final Class<T> type;
+ private final MessageHandler<T> handler;
- public HandlerAdapter(Class<T> type, MessageHandler<T> handler) {
- this.type = type;
- this.handler = handler;
- try {
- method = type.getMethod("parseFrom", ByteString.class,
ExtensionRegistryLite.class);
- } catch (Exception e) {
- throw new IllegalArgumentException(e);
- }
- registry = ExtensionRegistryLite.newInstance();
- }
-
public static <T> HandlerAdapter<T> create(Class<T> type,
MessageHandler<T> handler) {
return new HandlerAdapter<>(type, handler);
}
- @SuppressWarnings("unchecked")
- private T parseFrom(ByteString payloadBytes) throws Exception {
- return (T) method.invoke(null, payloadBytes, registry);
- }
-
- public void handle(MessageInfo info, ByteString payloadBytes) throws
Exception {
- T payload = parseFrom(payloadBytes);
- handler.handle(info, payload);
+ private HandlerAdapter(Class<T> type, MessageHandler<T> handler) {
+ this.type = type;
+ this.handler = handler;
}
public Class<?> getType() {
@@ -61,4 +38,9 @@ public class HandlerAdapter<T> {
public MessageHandler<T> getHandler() {
return this.handler;
}
+
+ @Override
+ public String toString() {
+ return "Message handler for type=" + type.getName() + ", handler=" +
handler.getClass().getName();
+ }
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
index 0a7f668..6b149c9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
@@ -19,13 +19,20 @@
package org.apache.sling.distribution.journal;
import java.util.Map;
+import java.util.function.Consumer;
-import com.google.protobuf.GeneratedMessage;
+public interface MessageSender<T> extends Consumer<T> {
-public interface MessageSender<T extends GeneratedMessage> {
-
- void send(String topic, T payload) throws MessagingException;
+ /**
+ * Make sure every MessageSender also acts as Consumer
+ */
+ @Override
+ default void accept(T payload) {
+ send(payload);
+ }
+
+ void send(T payload) throws MessagingException;
- void send(String topic, T payload, Map<String, String> properties) throws
MessagingException;
+ void send(T payload, Map<String, String> properties) throws
MessagingException;
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
index c4d49f9..cf8cbbe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
@@ -19,34 +19,20 @@
package org.apache.sling.distribution.journal;
import java.io.Closeable;
-import java.util.function.Consumer;
-
-import com.google.protobuf.GeneratedMessage;
public interface MessagingProvider {
- <T extends GeneratedMessage> MessageSender<T> createSender();
-
- /**
- * Create a sender for a fixed topic
- *
- * @param <T>
- * @param topic
- * @return
- */
- default <T extends GeneratedMessage> Consumer<T> createSender(String
topic) {
- MessageSender<GeneratedMessage> sender = createSender();
- return payload -> sender.send(topic, payload);
- }
+ <T> MessageSender<T> createSender(String topic);
- <T> Closeable createPoller(String topicName, Reset reset,
HandlerAdapter<?>... adapters);
+ default Closeable createPoller(
+ String topicName,
+ Reset reset,
+ HandlerAdapter<?> ... adapters) {
+ return createPoller(topicName, reset, null, adapters);
+ }
Closeable createPoller(String topicName, Reset reset, String assign,
HandlerAdapter<?>... adapters);
- <T> JsonMessageSender<T> createJsonSender();
-
- <T> Closeable createJsonPoller(String topicName, Reset reset,
MessageHandler<T> handler, Class<T> type);
-
void assertTopic(String topic) throws MessagingException;
long retrieveOffset(String topicName, Reset reset);
diff --git
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
b/src/main/java/org/apache/sling/distribution/journal/messages/ClearCommand.java
similarity index 70%
copy from
src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
copy to
src/main/java/org/apache/sling/distribution/journal/messages/ClearCommand.java
index b26dea6..42b6d51 100644
---
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
+++
b/src/main/java/org/apache/sling/distribution/journal/messages/ClearCommand.java
@@ -18,15 +18,24 @@
*/
package org.apache.sling.distribution.journal.messages;
-public class PackageDistributedMessage {
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
- public String pubAgentName;
-
- public String packageId;
-
- public String[] deepPaths;
-
- public String[] paths;
-
- public long offset;
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class ClearCommand {
+
+ // Subscriber agent Sling identifier
+ String subSlingId;
+
+ // Subscriber agent name
+ String subAgentName;
+
+ String pubAgentName;
+
+ long offset;
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
b/src/main/java/org/apache/sling/distribution/journal/messages/DiscoveryMessage.java
similarity index 66%
copy from
src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
copy to
src/main/java/org/apache/sling/distribution/journal/messages/DiscoveryMessage.java
index b26dea6..1cce06b 100644
---
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
+++
b/src/main/java/org/apache/sling/distribution/journal/messages/DiscoveryMessage.java
@@ -18,15 +18,26 @@
*/
package org.apache.sling.distribution.journal.messages;
-public class PackageDistributedMessage {
+import java.util.List;
- public String pubAgentName;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
- public String packageId;
-
- public String[] deepPaths;
-
- public String[] paths;
-
- public long offset;
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class DiscoveryMessage {
+
+ // Subscriber agent Sling identifier
+ String subSlingId;
+
+ // Subscriber agent name
+ String subAgentName;
+
+ SubscriberConfig subscriberConfiguration;
+
+ List<SubscriberState> subscriberStates;
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
b/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
index b26dea6..463a9a4 100644
---
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
+++
b/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
@@ -18,6 +18,15 @@
*/
package org.apache.sling.distribution.journal.messages;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class PackageDistributedMessage {
public String pubAgentName;
diff --git
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
b/src/main/java/org/apache/sling/distribution/journal/messages/PackageMessage.java
similarity index 56%
copy from
src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
copy to
src/main/java/org/apache/sling/distribution/journal/messages/PackageMessage.java
index b26dea6..0802541 100644
---
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
+++
b/src/main/java/org/apache/sling/distribution/journal/messages/PackageMessage.java
@@ -18,15 +18,38 @@
*/
package org.apache.sling.distribution.journal.messages;
-public class PackageDistributedMessage {
+import java.util.ArrayList;
+import java.util.List;
- public String pubAgentName;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
- public String packageId;
-
- public String[] deepPaths;
-
- public String[] paths;
-
- public long offset;
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PackageMessage {
+ String pubSlingId;
+ ReqType reqType;
+ String pkgId;
+ String pkgType;
+ long pkgLength;
+ byte[] pkgBinary;
+ String pkgBinaryRef;
+ String pubAgentName;
+ String userId;
+
+ @Builder.Default
+ List<String> paths = new ArrayList<>();
+
+ @Builder.Default
+ List<String> deepPaths = new ArrayList<>();
+
+ public enum ReqType {
+ ADD,
+ DELETE,
+ TEST;
+ }
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageStatusMessage.java
b/src/main/java/org/apache/sling/distribution/journal/messages/PackageStatusMessage.java
new file mode 100644
index 0000000..b8e5b15
--- /dev/null
+++
b/src/main/java/org/apache/sling/distribution/journal/messages/PackageStatusMessage.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.messages;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PackageStatusMessage {
+
+ String subSlingId;
+ String subAgentName;
+ String pubAgentName;
+ long offset;
+ PackageStatusMessage.Status status;
+
+ public enum Status {
+ /**
+ * The package has been removed automatically after failing every
retry attempts
+ */
+ REMOVED_FAILED(0),
+ /**
+ * The package has been removed explicitly
+ */
+ REMOVED(1),
+ /**
+ * The package has been imported
+ */
+ IMPORTED(2);
+
+ private int number;
+
+ Status(int number) {
+ this.number = number;
+ }
+
+ public int getNumber() {
+ return number;
+ }
+
+ public static Status fromNumber(int number) {
+ switch (number) {
+ case 0: return REMOVED_FAILED;
+ case 1: return REMOVED;
+ case 2: return IMPORTED;
+ }
+ throw new IllegalStateException("Unknown number " + number);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/JsonMessageSender.java
b/src/main/java/org/apache/sling/distribution/journal/messages/PingMessage.java
similarity index 75%
rename from
src/main/java/org/apache/sling/distribution/journal/JsonMessageSender.java
rename to
src/main/java/org/apache/sling/distribution/journal/messages/PingMessage.java
index 82a4813..0fab986 100644
--- a/src/main/java/org/apache/sling/distribution/journal/JsonMessageSender.java
+++
b/src/main/java/org/apache/sling/distribution/journal/messages/PingMessage.java
@@ -16,10 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.distribution.journal;
+package org.apache.sling.distribution.journal.messages;
-public interface JsonMessageSender<T> {
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
- void send(String topic, T payload);
-
-}
\ No newline at end of file
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PingMessage {
+ String message;
+}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
b/src/main/java/org/apache/sling/distribution/journal/messages/SubscriberConfig.java
similarity index 64%
copy from
src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
copy to
src/main/java/org/apache/sling/distribution/journal/messages/SubscriberConfig.java
index b26dea6..10f6fa5 100644
---
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
+++
b/src/main/java/org/apache/sling/distribution/journal/messages/SubscriberConfig.java
@@ -18,15 +18,22 @@
*/
package org.apache.sling.distribution.journal.messages;
-public class PackageDistributedMessage {
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
- public String pubAgentName;
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SubscriberConfig {
+ boolean editable;
- public String packageId;
-
- public String[] deepPaths;
-
- public String[] paths;
-
- public long offset;
+ /**
+ * The max number of retry attempts to process this package. A value
smaller
+ * than zero indicates an infinite number of retry attempts. A value
greater or
+ * equal to zero indicates a specific number of retry attempts.
+ */
+ int maxRetries;
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
b/src/main/java/org/apache/sling/distribution/journal/messages/SubscriberState.java
similarity index 67%
copy from
src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
copy to
src/main/java/org/apache/sling/distribution/journal/messages/SubscriberState.java
index b26dea6..1513f21 100644
---
a/src/main/java/org/apache/sling/distribution/journal/messages/PackageDistributedMessage.java
+++
b/src/main/java/org/apache/sling/distribution/journal/messages/SubscriberState.java
@@ -18,15 +18,22 @@
*/
package org.apache.sling.distribution.journal.messages;
-public class PackageDistributedMessage {
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
- public String pubAgentName;
-
- public String packageId;
-
- public String[] deepPaths;
-
- public String[] paths;
-
- public long offset;
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class SubscriberState {
+ // Publisher agent name
+ String pubAgentName;
+
+ // Last processed offset on the Subscriber agent
+ long offset;
+
+ // Nb of retries for the current offset on the Subscriber agent
+ int retries;
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/messages/Types.java
b/src/main/java/org/apache/sling/distribution/journal/messages/Types.java
deleted file mode 100644
index f49a103..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/messages/Types.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.messages;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class Types {
- private static Map<Class<?>, Integer> types = new HashMap<>();
- static {
- types.put(Messages.DiscoveryMessage.class, 1);
- types.put(Messages.PackageMessage.class, 2);
- types.put(Messages.PackageStatusMessage.class, 3);
- types.put(Messages.CommandMessage.class, 4);
- types.put(Messages.PingMessage.class, 5);
- }
-
- public static Class<?> getType(int type, int version) {
- for (Class<?> clazz : types.keySet()) {
- Integer cType = types.get(clazz);
- if (cType == type && version == 1) {
- return clazz;
- }
- }
- throw new IllegalArgumentException(String.format("Unknown type %d and
version %d", type, version));
- }
-
- public static Integer getType(Class<?> clazz) {
- return types.get(clazz);
- }
-
- public static Integer getVersion(Class<?> clazz) {
- return 1;
- }
-}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/messages/package-info.java
b/src/main/java/org/apache/sling/distribution/journal/messages/package-info.java
index f69a41d..093cdc1 100644
---
a/src/main/java/org/apache/sling/distribution/journal/messages/package-info.java
+++
b/src/main/java/org/apache/sling/distribution/journal/messages/package-info.java
@@ -16,6 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
[email protected]("0.1.0")
[email protected]("1.0.0")
package org.apache.sling.distribution.journal.messages;
diff --git
a/src/main/java/org/apache/sling/distribution/journal/package-info.java
b/src/main/java/org/apache/sling/distribution/journal/package-info.java
index 444f520..43650db 100644
--- a/src/main/java/org/apache/sling/distribution/journal/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/journal/package-info.java
@@ -16,6 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
[email protected]("0.2.0")
[email protected]("1.0.0")
package org.apache.sling.distribution.journal;
diff --git a/src/main/protobuf/messages.proto b/src/main/protobuf/messages.proto
deleted file mode 100644
index 5ce6e6b..0000000
--- a/src/main/protobuf/messages.proto
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.messages;
-
-message PackageMessage {
- enum ReqType {
- ADD = 0;
- DELETE = 1;
- TEST = 2;
- }
- required string pubSlingId = 1; // Publisher agent Sling identifier
- //required int32 maxRetries = 9;
- required ReqType reqType = 4; // The request type
- required string pkgId = 3; // Content package identifier
- required string pkgType = 11; // The package type
- optional int64 pkgLength = 5;
- optional bytes pkgBinary = 6; // The content package binary. Either
the pkgBinary or the pkgBinaryRef field MUST be set.
- optional string pkgBinaryRef = 10; // The reference to the content binary
in the shared blob store. Either the pkgBinary or the pkgBinaryRef field MUST
be set.
- optional string pubAgentName = 2; // Publisher agent name
- optional string userId = 12;
- repeated string paths = 7;
- repeated string deepPaths = 8;
-}
-
-message PackageStatusMessage {
- enum Status {
- REMOVED_FAILED = 0; // The package has been
removed automatically after failing every retry attempts
- REMOVED = 1; // The package has been
removed explicitly
- IMPORTED = 2; // The package has been
imported
- }
- required string subSlingId = 1; // Subscriber agent Sling
identifier
- required string subAgentName = 2; // Subscriber agent name
- required string pubAgentName = 3; // Publisher agent name
- required int64 offset = 4; // The offset of the package
for which the status is given
- required Status status = 5; // The package status
-}
-
-
-message SubscriberState {
- required string pubAgentName = 1; // Publisher agent name
- required int64 offset = 2; // Last processed offset on the
Subscriber agent
- optional int32 retries = 3; // Nb of retries for the current
offset on the Subscriber agent
-}
-
-message SubscriberConfiguration {
- required bool editable = 1;
- required int32 maxRetries = 2; // The max number of retry attempts to
process this package. A value smaller than zero indicates an infinite number of
retry attempts. A value greater or equal to zero indicates a specific number of
retry attempts.
-}
-
-message DiscoveryMessage {
- required string subSlingId = 1; // Subscriber agent Sling
identifier
- required string subAgentName = 2; // Subscriber agent name
- repeated SubscriberState subscriberState = 3;
- required SubscriberConfiguration subscriberConfiguration = 4;
-}
-
-message ClearCommand {
- required int64 offset = 1;
-}
-
-message CommandMessage {
- required string subSlingId = 1; // Subscriber agent Sling
identifier
- required string subAgentName = 2; // Subscriber agent name
- optional ClearCommand clearCommand = 3;
- optional string pubAgentName = 4;
-}
-
-message PingMessage {
-}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/messages/PackageMessageTest.java
b/src/test/java/org/apache/sling/distribution/journal/messages/PackageMessageTest.java
new file mode 100644
index 0000000..90d2617
--- /dev/null
+++
b/src/test/java/org/apache/sling/distribution/journal/messages/PackageMessageTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.messages;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+
+public class PackageMessageTest {
+
+ @Test
+ public void testSerialize() throws JsonGenerationException,
JsonMappingException, IOException {
+ byte[] pkgBinary = new String("dummy").getBytes();
+ PackageMessage message = PackageMessage.builder()
+ .paths(Collections.singletonList("/test"))
+ .pkgBinary(pkgBinary)
+ .build();
+ ObjectWriter writer = new
ObjectMapper().writerFor(PackageMessage.class);
+ StringWriter outWriter = new StringWriter();
+ writer.writeValue(outWriter, message);
+ String serialized = outWriter.getBuffer().toString();
+ Path path = Paths.get("src/test/resources/serialized.json");
+ String expected = Files.lines(path,
StandardCharsets.UTF_8).collect(Collectors.joining());
+ Assert.assertThat(serialized, equalTo(expected));
+ }
+}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/messages/SerTest.java
b/src/test/java/org/apache/sling/distribution/journal/messages/SerTest.java
deleted file mode 100644
index b093c34..0000000
--- a/src/test/java/org/apache/sling/distribution/journal/messages/SerTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.messages;
-
-import static
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import java.lang.reflect.Method;
-
-import org.junit.Test;
-
-import
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessageOrBuilder;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
-import
org.apache.sling.distribution.journal.messages.Messages.SubscriberStateOrBuilder;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistryLite;
-import com.google.protobuf.GeneratedMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-public class SerTest {
-
- @Test
- public void test() throws InvalidProtocolBufferException {
- DiscoveryMessage discoveryMsg = createMessage();
- byte[] messageBytes = discoveryMsg.toByteArray();
-
- ExtensionRegistryLite registry = ExtensionRegistryLite.newInstance();
- DiscoveryMessageOrBuilder messageIn =
DiscoveryMessage.parseFrom(messageBytes, registry);
- checkMessage(messageIn);
-
- }
-
- @Test
- public void testFromClass() throws Exception {
- DiscoveryMessage discoveryMsg = createMessage();
- ByteString byteString = discoveryMsg.toByteString();
-
- ExtensionRegistryLite registry = ExtensionRegistryLite.newInstance();
- Class<? extends GeneratedMessage> type = DiscoveryMessage.class;
- Method method = type.getMethod("parseFrom", ByteString.class,
ExtensionRegistryLite.class);
- DiscoveryMessage messageIn = (DiscoveryMessage) method.invoke(null,
byteString, registry);
- checkMessage(messageIn);
- }
-
- private DiscoveryMessage createMessage() {
- DiscoveryMessage discoveryMsg = DiscoveryMessage.newBuilder()
- .setSubSlingId("subscribersling1")
- .setSubAgentName("subscriber1")
- .setSubscriberConfiguration(SubscriberConfiguration
- .newBuilder()
- .setEditable(false)
- .setMaxRetries(-1)
- .build())
- .addSubscriberState(SubscriberState
- .newBuilder()
- .setPubAgentName("publisher1")
- .setOffset(10).build())
- .build();
- return discoveryMsg;
- }
-
- private void checkMessage(DiscoveryMessageOrBuilder messageIn) {
- assertThat(messageIn.getSubSlingId(), equalTo("subscribersling1"));
- assertThat(messageIn.getSubAgentName(), equalTo("subscriber1"));
- SubscriberStateOrBuilder offset =
messageIn.getSubscriberStateList().iterator().next();
- assertThat(offset.getPubAgentName(), equalTo("publisher1"));
- assertThat(offset.getOffset(), equalTo(10l));
- }
-}
diff --git a/src/test/resources/serialized.json
b/src/test/resources/serialized.json
new file mode 100644
index 0000000..a67341b
--- /dev/null
+++ b/src/test/resources/serialized.json
@@ -0,0 +1 @@
+{"pubSlingId":null,"reqType":null,"pkgId":null,"pkgType":null,"pkgLength":0,"pkgBinary":"ZHVtbXk=","pkgBinaryRef":null,"pubAgentName":null,"userId":null,"paths":["/test"],"deepPaths":[]}
\ No newline at end of file