This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new 4c468a103b Add filtered and source fields to typed EventEnvelope
(#2807)
4c468a103b is described below
commit 4c468a103bc39fc4642406a41340e765c635b21e
Author: He-Pin(kerr) <[email protected]>
AuthorDate: Sat Mar 28 17:01:52 2026 +0800
Add filtered and source fields to typed EventEnvelope (#2807)
Add 'filtered' (Boolean) and 'source' (String) fields to the typed
EventEnvelope class for supporting filtered event delivery and tracking
event sources (e.g. backtracking queries). These fields are needed by
Projections to handle filtered events and event source attribution.
Changes:
- EventEnvelope: Add filtered/source constructor params with backward-
compatible auxiliary constructor defaulting to (false, "")
- EventEnvelope companion: Add new apply/create factory methods with
filtered and source parameters
- EventEnvelope.equals: Include filtered field in equality check
- EventEnvelope.toString: Include filtered and source in output
- QueryMessages.proto: Add optional bool filtered = 10 and optional
string source = 11 fields
- QuerySerializer: Serialize/deserialize the new fields
- QuerySerializerSpec: Add test for filtered and source round-trip
- Regenerate QueryMessages.java from updated proto definition
Upstream: akka/akka-core@c9e4fa6f9f
Cherry-picked from akka/akka-core v2.8.0, which is now Apache licensed.
---
.../query/internal/protobuf/QueryMessages.java | 337 ++++++++++++++++++++-
.../src/main/protobuf/QueryMessages.proto | 2 +
.../query/internal/QuerySerializer.scala | 12 +-
.../persistence/query/typed/EventEnvelope.scala | 53 +++-
.../query/internal/QuerySerializerSpec.scala | 14 +
5 files changed, 408 insertions(+), 10 deletions(-)
diff --git
a/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
b/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
index 0b4964a911..bc26bed3e5 100644
---
a/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
+++
b/persistence-query/src/main/java/org/apache/pekko/persistence/query/internal/protobuf/QueryMessages.java
@@ -14,7 +14,7 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// NO CHECKED-IN PROTOBUF GENCODE
// source: QueryMessages.proto
-// Protobuf Java Version: 4.33.0
+// Protobuf Java Version: 4.34.0
package org.apache.pekko.persistence.query.internal.protobuf;
@@ -26,7 +26,7 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"QueryMessages");
@@ -204,6 +204,41 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
/** <code>optional .Payload metadata = 9;</code> */
org.apache.pekko.remote.ContainerFormats.PayloadOrBuilder
getMetadataOrBuilder();
+
+ /**
+ * <code>optional bool filtered = 10;</code>
+ *
+ * @return Whether the filtered field is set.
+ */
+ boolean hasFiltered();
+
+ /**
+ * <code>optional bool filtered = 10;</code>
+ *
+ * @return The filtered.
+ */
+ boolean getFiltered();
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @return Whether the source field is set.
+ */
+ boolean hasSource();
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @return The source.
+ */
+ java.lang.String getSource();
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @return The bytes for source.
+ */
+ org.apache.pekko.protobufv3.internal.ByteString getSourceBytes();
}
/**
@@ -226,7 +261,7 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
org.apache.pekko.protobufv3.internal.RuntimeVersion.validateProtobufGencodeVersion(
org.apache.pekko.protobufv3.internal.RuntimeVersion.RuntimeDomain.PUBLIC,
/* major= */ 4,
- /* minor= */ 33,
+ /* minor= */ 34,
/* patch= */ 0,
/* suffix= */ "",
"EventEnvelope");
@@ -243,6 +278,7 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
entityType_ = "";
offset_ = "";
offsetManifest_ = "";
+ source_ = "";
}
public static final
org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
@@ -251,6 +287,12 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
.internal_static_org_apache_pekko_persistence_query_EventEnvelope_descriptor;
}
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
+ return org.apache.pekko.persistence.query.internal.protobuf.QueryMessages
+
.internal_static_org_apache_pekko_persistence_query_EventEnvelope_descriptor;
+ }
+
@java.lang.Override
protected
org.apache.pekko.protobufv3.internal.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
@@ -615,6 +657,83 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
: metadata_;
}
+ public static final int FILTERED_FIELD_NUMBER = 10;
+ private boolean filtered_ = false;
+
+ /**
+ * <code>optional bool filtered = 10;</code>
+ *
+ * @return Whether the filtered field is set.
+ */
+ @java.lang.Override
+ public boolean hasFiltered() {
+ return ((bitField0_ & 0x00000200) != 0);
+ }
+
+ /**
+ * <code>optional bool filtered = 10;</code>
+ *
+ * @return The filtered.
+ */
+ @java.lang.Override
+ public boolean getFiltered() {
+ return filtered_;
+ }
+
+ public static final int SOURCE_FIELD_NUMBER = 11;
+
+ @SuppressWarnings("serial")
+ private volatile java.lang.Object source_ = "";
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @return Whether the source field is set.
+ */
+ @java.lang.Override
+ public boolean hasSource() {
+ return ((bitField0_ & 0x00000400) != 0);
+ }
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @return The source.
+ */
+ @java.lang.Override
+ public java.lang.String getSource() {
+ java.lang.Object ref = source_;
+ if (ref instanceof java.lang.String) {
+ return (java.lang.String) ref;
+ } else {
+ org.apache.pekko.protobufv3.internal.ByteString bs =
+ (org.apache.pekko.protobufv3.internal.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ source_ = s;
+ }
+ return s;
+ }
+ }
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @return The bytes for source.
+ */
+ @java.lang.Override
+ public org.apache.pekko.protobufv3.internal.ByteString getSourceBytes() {
+ java.lang.Object ref = source_;
+ if (ref instanceof java.lang.String) {
+ org.apache.pekko.protobufv3.internal.ByteString b =
+
org.apache.pekko.protobufv3.internal.ByteString.copyFromUtf8((java.lang.String)
ref);
+ source_ = b;
+ return b;
+ } else {
+ return (org.apache.pekko.protobufv3.internal.ByteString) ref;
+ }
+ }
+
private byte memoizedIsInitialized = -1;
@java.lang.Override
@@ -699,6 +818,12 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
if (((bitField0_ & 0x00000100) != 0)) {
output.writeMessage(9, getMetadata());
}
+ if (((bitField0_ & 0x00000200) != 0)) {
+ output.writeBool(10, filtered_);
+ }
+ if (((bitField0_ & 0x00000400) != 0)) {
+
org.apache.pekko.protobufv3.internal.GeneratedMessage.writeString(output, 11,
source_);
+ }
getUnknownFields().writeTo(output);
}
@@ -746,6 +871,14 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeMessageSize(
9, getMetadata());
}
+ if (((bitField0_ & 0x00000200) != 0)) {
+ size +=
+
org.apache.pekko.protobufv3.internal.CodedOutputStream.computeBoolSize(10,
filtered_);
+ }
+ if (((bitField0_ & 0x00000400) != 0)) {
+ size +=
+
org.apache.pekko.protobufv3.internal.GeneratedMessage.computeStringSize(11,
source_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSize = size;
return size;
@@ -800,6 +933,14 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
if (hasMetadata()) {
if (!getMetadata().equals(other.getMetadata())) return false;
}
+ if (hasFiltered() != other.hasFiltered()) return false;
+ if (hasFiltered()) {
+ if (getFiltered() != other.getFiltered()) return false;
+ }
+ if (hasSource() != other.hasSource()) return false;
+ if (hasSource()) {
+ if (!getSource().equals(other.getSource())) return false;
+ }
if (!getUnknownFields().equals(other.getUnknownFields())) return false;
return true;
}
@@ -848,6 +989,15 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
hash = (37 * hash) + METADATA_FIELD_NUMBER;
hash = (53 * hash) + getMetadata().hashCode();
}
+ if (hasFiltered()) {
+ hash = (37 * hash) + FILTERED_FIELD_NUMBER;
+ hash =
+ (53 * hash) +
org.apache.pekko.protobufv3.internal.Internal.hashBoolean(getFiltered());
+ }
+ if (hasSource()) {
+ hash = (37 * hash) + SOURCE_FIELD_NUMBER;
+ hash = (53 * hash) + getSource().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -1039,6 +1189,8 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
metadataBuilder_.dispose();
metadataBuilder_ = null;
}
+ filtered_ = false;
+ source_ = "";
return this;
}
@@ -1119,6 +1271,14 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
result.metadata_ = metadataBuilder_ == null ? metadata_ :
metadataBuilder_.build();
to_bitField0_ |= 0x00000100;
}
+ if (((from_bitField0_ & 0x00000200) != 0)) {
+ result.filtered_ = filtered_;
+ to_bitField0_ |= 0x00000200;
+ }
+ if (((from_bitField0_ & 0x00000400) != 0)) {
+ result.source_ = source_;
+ to_bitField0_ |= 0x00000400;
+ }
result.bitField0_ |= to_bitField0_;
}
@@ -1176,6 +1336,14 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
if (other.hasMetadata()) {
mergeMetadata(other.getMetadata());
}
+ if (other.hasFiltered()) {
+ setFiltered(other.getFiltered());
+ }
+ if (other.hasSource()) {
+ source_ = other.source_;
+ bitField0_ |= 0x00000400;
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
onChanged();
return this;
@@ -1288,6 +1456,18 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
bitField0_ |= 0x00000100;
break;
} // case 74
+ case 80:
+ {
+ filtered_ = input.readBool();
+ bitField0_ |= 0x00000200;
+ break;
+ } // case 80
+ case 90:
+ {
+ source_ = input.readBytes();
+ bitField0_ |= 0x00000400;
+ break;
+ } // case 90
default:
{
if (!super.parseUnknownField(input, extensionRegistry, tag))
{
@@ -2074,6 +2254,146 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
return metadataBuilder_;
}
+ private boolean filtered_;
+
+ /**
+ * <code>optional bool filtered = 10;</code>
+ *
+ * @return Whether the filtered field is set.
+ */
+ @java.lang.Override
+ public boolean hasFiltered() {
+ return ((bitField0_ & 0x00000200) != 0);
+ }
+
+ /**
+ * <code>optional bool filtered = 10;</code>
+ *
+ * @return The filtered.
+ */
+ @java.lang.Override
+ public boolean getFiltered() {
+ return filtered_;
+ }
+
+ /**
+ * <code>optional bool filtered = 10;</code>
+ *
+ * @param value The filtered to set.
+ * @return This builder for chaining.
+ */
+ public Builder setFiltered(boolean value) {
+
+ filtered_ = value;
+ bitField0_ |= 0x00000200;
+ onChanged();
+ return this;
+ }
+
+ /**
+ * <code>optional bool filtered = 10;</code>
+ *
+ * @return This builder for chaining.
+ */
+ public Builder clearFiltered() {
+ bitField0_ = (bitField0_ & ~0x00000200);
+ filtered_ = false;
+ onChanged();
+ return this;
+ }
+
+ private java.lang.Object source_ = "";
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @return Whether the source field is set.
+ */
+ public boolean hasSource() {
+ return ((bitField0_ & 0x00000400) != 0);
+ }
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @return The source.
+ */
+ public java.lang.String getSource() {
+ java.lang.Object ref = source_;
+ if (!(ref instanceof java.lang.String)) {
+ org.apache.pekko.protobufv3.internal.ByteString bs =
+ (org.apache.pekko.protobufv3.internal.ByteString) ref;
+ java.lang.String s = bs.toStringUtf8();
+ if (bs.isValidUtf8()) {
+ source_ = s;
+ }
+ return s;
+ } else {
+ return (java.lang.String) ref;
+ }
+ }
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @return The bytes for source.
+ */
+ public org.apache.pekko.protobufv3.internal.ByteString getSourceBytes() {
+ java.lang.Object ref = source_;
+ if (ref instanceof String) {
+ org.apache.pekko.protobufv3.internal.ByteString b =
+
org.apache.pekko.protobufv3.internal.ByteString.copyFromUtf8((java.lang.String)
ref);
+ source_ = b;
+ return b;
+ } else {
+ return (org.apache.pekko.protobufv3.internal.ByteString) ref;
+ }
+ }
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @param value The source to set.
+ * @return This builder for chaining.
+ */
+ public Builder setSource(java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ source_ = value;
+ bitField0_ |= 0x00000400;
+ onChanged();
+ return this;
+ }
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @return This builder for chaining.
+ */
+ public Builder clearSource() {
+ source_ = getDefaultInstance().getSource();
+ bitField0_ = (bitField0_ & ~0x00000400);
+ onChanged();
+ return this;
+ }
+
+ /**
+ * <code>optional string source = 11;</code>
+ *
+ * @param value The bytes for source to set.
+ * @return This builder for chaining.
+ */
+ public Builder
setSourceBytes(org.apache.pekko.protobufv3.internal.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ source_ = value;
+ bitField0_ |= 0x00000400;
+ onChanged();
+ return this;
+ }
+
//
@@protoc_insertion_point(builder_scope:org.apache.pekko.persistence.query.EventEnvelope)
}
@@ -2140,13 +2460,13 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
return descriptor;
}
- private static
org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor descriptor;
+ private static final
org.apache.pekko.protobufv3.internal.Descriptors.FileDescriptor descriptor;
static {
java.lang.String[] descriptorData = {
"\n"
+ "\023QueryMessages.proto\022\"org.apache.pekko."
- + "persistence.query\032\026ContainerFormats.proto\"\321\001\n\r"
+ + "persistence.query\032\026ContainerFormats.proto\"\363\001\n\r"
+ "EventEnvelope\022\026\n"
+ "\016persistence_id\030\001 \002(\t\022\023\n"
+ "\013entity_type\030\002 \002(\t\022\r\n"
@@ -2156,7 +2476,10 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
+ "\006offset\030\006 \002(\t\022\027\n"
+ "\017offset_manifest\030\007 \002(\t\022\027\n"
+ "\005event\030\010 \001(\0132\010.Payload\022\032\n"
- + "\010metadata\030\t \001(\0132\010.PayloadB8\n"
+ + "\010metadata\030\t \001(\0132\010.Payload\022\020\n"
+ + "\010filtered\030\n"
+ + " \001(\010\022\016\n"
+ + "\006source\030\013 \001(\tB8\n"
+ "4org.apache.pekko.persistence.query.internal.protobufH\001"
};
descriptor =
@@ -2181,6 +2504,8 @@ public final class QueryMessages extends
org.apache.pekko.protobufv3.internal.Ge
"OffsetManifest",
"Event",
"Metadata",
+ "Filtered",
+ "Source",
});
descriptor.resolveAllFeaturesImmutable();
org.apache.pekko.remote.ContainerFormats.getDescriptor();
diff --git a/persistence-query/src/main/protobuf/QueryMessages.proto
b/persistence-query/src/main/protobuf/QueryMessages.proto
index 25291b4c80..f3c3d192f7 100644
--- a/persistence-query/src/main/protobuf/QueryMessages.proto
+++ b/persistence-query/src/main/protobuf/QueryMessages.proto
@@ -30,4 +30,6 @@ message EventEnvelope {
required string offset_manifest = 7;
optional Payload event = 8;
optional Payload metadata = 9;
+ optional bool filtered = 10;
+ optional string source = 11;
}
diff --git
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
index f024f50aaf..6e0538bf27 100644
---
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
+++
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/internal/QuerySerializer.scala
@@ -86,6 +86,11 @@ import pekko.serialization.Serializers
env.eventOption.foreach(event => builder.setEvent(payloadBuilder(event,
serialization, log)))
env.eventMetadata.foreach(meta =>
builder.setMetadata(payloadBuilder(meta, serialization, log)))
+ if (env.filtered)
+ builder.setFiltered(true)
+ if (env.source.nonEmpty)
+ builder.setSource(env.source)
+
builder.build().toByteArray()
case offset: Offset =>
@@ -108,6 +113,9 @@ import pekko.serialization.Serializers
if (env.hasMetadata) Option(deserializePayload(env.getMetadata,
serialization))
else None
+ val filtered = env.hasFiltered && env.getFiltered
+ val source = if (env.hasSource) env.getSource else ""
+
new EventEnvelope(
offset,
env.getPersistenceId,
@@ -116,7 +124,9 @@ import pekko.serialization.Serializers
env.getTimestamp,
metaOption,
env.getEntityType,
- env.getSlice)
+ env.getSlice,
+ filtered,
+ source)
case _ =>
fromStorageRepresentation(new String(bytes, UTF_8), manifest)
diff --git
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
index 0a48ee0e3a..551d46526d 100644
---
a/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
+++
b/persistence-query/src/main/scala/org/apache/pekko/persistence/query/typed/EventEnvelope.scala
@@ -31,6 +31,28 @@ object EventEnvelope {
slice: Int): EventEnvelope[Event] =
new EventEnvelope(offset, persistenceId, sequenceNr, Option(event),
timestamp, None, entityType, slice)
+ def apply[Event](
+ offset: Offset,
+ persistenceId: String,
+ sequenceNr: Long,
+ event: Event,
+ timestamp: Long,
+ entityType: String,
+ slice: Int,
+ filtered: Boolean,
+ source: String): EventEnvelope[Event] =
+ new EventEnvelope(
+ offset,
+ persistenceId,
+ sequenceNr,
+ Option(event),
+ timestamp,
+ None,
+ entityType,
+ slice,
+ filtered,
+ source)
+
def create[Event](
offset: Offset,
persistenceId: String,
@@ -41,6 +63,18 @@ object EventEnvelope {
slice: Int): EventEnvelope[Event] =
apply(offset, persistenceId, sequenceNr, event, timestamp, entityType,
slice)
+ def create[Event](
+ offset: Offset,
+ persistenceId: String,
+ sequenceNr: Long,
+ event: Event,
+ timestamp: Long,
+ entityType: String,
+ slice: Int,
+ filtered: Boolean,
+ source: String): EventEnvelope[Event] =
+ apply(offset, persistenceId, sequenceNr, event, timestamp, entityType,
slice, filtered, source)
+
def unapply[Event](arg: EventEnvelope[Event]): Option[(Offset, String, Long,
Option[Event], Long)] =
Some((arg.offset, arg.persistenceId, arg.sequenceNr, arg.eventOption,
arg.timestamp))
}
@@ -68,7 +102,20 @@ final class EventEnvelope[Event](
val timestamp: Long,
val eventMetadata: Option[Any],
val entityType: String,
- val slice: Int) {
+ val slice: Int,
+ val filtered: Boolean,
+ val source: String) {
+
+ def this(
+ offset: Offset,
+ persistenceId: String,
+ sequenceNr: Long,
+ eventOption: Option[Event],
+ timestamp: Long,
+ eventMetadata: Option[Any],
+ entityType: String,
+ slice: Int) =
+ this(offset, persistenceId, sequenceNr, eventOption, timestamp,
eventMetadata, entityType, slice, false, "")
def event: Event =
eventOption match {
@@ -117,7 +164,7 @@ final class EventEnvelope[Event](
case other: EventEnvelope[_] =>
offset == other.offset && persistenceId == other.persistenceId &&
sequenceNr == other.sequenceNr &&
eventOption == other.eventOption && timestamp == other.timestamp &&
eventMetadata == other.eventMetadata &&
- entityType == other.entityType && slice == other.slice
+ entityType == other.entityType && slice == other.slice && filtered ==
other.filtered
case _ => false
}
@@ -130,6 +177,6 @@ final class EventEnvelope[Event](
case Some(meta) => meta.getClass.getName
case None => ""
}
-
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$eventStr,$timestamp,$metaStr,$entityType,$slice)"
+
s"EventEnvelope($offset,$persistenceId,$sequenceNr,$eventStr,$timestamp,$metaStr,$entityType,$slice,$filtered,$source)"
}
}
diff --git
a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
index cd6e07f055..a3b39cce7e 100644
---
a/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
+++
b/persistence-query/src/test/scala/org/apache/pekko/persistence/query/internal/QuerySerializerSpec.scala
@@ -77,6 +77,20 @@ class QuerySerializerSpec extends PekkoSpec {
EventEnvelope(timeUuidOffset, "TestEntity|id1", 3L, "event1",
System.currentTimeMillis(), "TestEntity", 5))
}
+ "serialize EventEnvelope with filtered and source" in {
+ verifySerialization(
+ EventEnvelope(
+ Sequence(1L),
+ "TestEntity|id1",
+ 3L,
+ "event1",
+ System.currentTimeMillis(),
+ "TestEntity",
+ 5,
+ filtered = true,
+ source = "backtracking"))
+ }
+
"serialize Sequence Offset" in {
verifySerialization(Sequence(0))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]