This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a3e8fd1266 IGNITE-21904 Mechanism to provide custom Event
implementations to register EventSerializer (#3606)
a3e8fd1266 is described below
commit a3e8fd1266334aa09e3789f15db2262a60057578
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Tue Apr 16 12:29:06 2024 +0300
IGNITE-21904 Mechanism to provide custom Event implementations to register
EventSerializer (#3606)
---
.../apache/ignite/internal/eventlog/api/Event.java | 15 +++--
.../ignite/internal/eventlog/event/EventImpl.java | 13 ++---
.../internal/eventlog/impl/EventLogImpl.java | 5 +-
.../ignite/internal/eventlog/impl/LogSink.java | 5 +-
.../impl/{SinkFactory.java => LogSinkFactory.java} | 18 +++---
.../ignite/internal/eventlog/impl/SinkFactory.java | 9 +--
.../eventlog/ser/EventSerializerFactory.java | 30 ++++++++++
...alizer.java => JacksonBasedJsonSerializer.java} | 50 ++++------------
.../internal/eventlog/event/EventBuilderTest.java | 30 +++++-----
.../internal/eventlog/event/IgniteEventsTest.java | 4 +-
.../ConfigurationBasedChannelRegistryTest.java | 6 +-
.../impl/ConfigurationBasedSinkRegistryTest.java | 3 +-
.../internal/eventlog/impl/EventLogTest.java | 2 +-
.../ignite/internal/eventlog/impl/LogSinkTest.java | 8 ++-
.../ignite/internal/eventlog/ser/CustomEvent.java | 67 ++++++++++++++++++++++
.../internal/eventlog/ser/CustomEventBuilder.java | 51 ++++++++++++++++
...st.java => JacksonBasedJsonSerializerTest.java} | 39 ++++++++++++-
.../ignite/internal/eventlog/ser/Message.java | 36 ++++++++++++
18 files changed, 294 insertions(+), 97 deletions(-)
diff --git
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
index 8e5f4f6f57..ccc7a78c02 100644
---
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
+++
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
@@ -22,7 +22,10 @@ import
org.apache.ignite.internal.eventlog.event.EventBuilder;
import org.apache.ignite.internal.eventlog.event.EventTypeRegistry;
import org.apache.ignite.internal.eventlog.event.EventUser;
-/** Represents an event object that can be logged to the event log. */
+/**
+ * Represents an event object that can be logged to the event log.
+ * All implementations of this interface must be plain POJO.
+ **/
public interface Event {
/** Default builder for the event object. */
static EventBuilder builder() {
@@ -30,17 +33,17 @@ public interface Event {
}
/** The type of the event. The type must be registered in the {@link
EventTypeRegistry}. */
- String type();
+ String getType();
/** The unix timestamp of the event. */
- long timestamp();
+ long getTimestamp();
/** The product version. The version is compatible with semver. */
- String productVersion();
+ String getProductVersion();
/** The user that caused the event. If the user is not available, the
method returns a system user. */
- EventUser user();
+ EventUser getUser();
/** The event-specific fields of the event. */
- Map<String, Object> fields();
+ Map<String, Object> getFields();
}
diff --git
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
index c5691f2aeb..fa0918ae97 100644
---
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
+++
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
@@ -20,13 +20,12 @@ package org.apache.ignite.internal.eventlog.event;
import java.util.Map;
import java.util.Objects;
import org.apache.ignite.internal.eventlog.api.Event;
-import org.apache.ignite.internal.eventlog.ser.JsonEventSerializer;
/**
* Implementation of the {@link Event} interface. The class is immutable and
thread-safe. If you want to create an instance of this class,
* use the {@link EventBuilder}.
*
- * <p>NOTE: If you rename/add any field in this class, you should also update
the {@link JsonEventSerializer}.
+ * <p>NOTE: This class should always be a plain POJO.
*/
public class EventImpl implements Event {
private final String type;
@@ -48,27 +47,27 @@ public class EventImpl implements Event {
}
@Override
- public String type() {
+ public String getType() {
return type;
}
@Override
- public long timestamp() {
+ public long getTimestamp() {
return timestamp;
}
@Override
- public String productVersion() {
+ public String getProductVersion() {
return productVersion;
}
@Override
- public EventUser user() {
+ public EventUser getUser() {
return user;
}
@Override
- public Map<String, Object> fields() {
+ public Map<String, Object> getFields() {
return fields;
}
diff --git
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
index 487a43374b..16122366c5 100644
---
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
+++
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.eventlog.api.Event;
import org.apache.ignite.internal.eventlog.api.EventChannel;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.eventlog.ser.EventSerializerFactory;
/**
* Implementation of the {@link EventLog} interface.
@@ -45,7 +46,7 @@ public class EventLogImpl implements EventLog {
* @param cfg the configuration.
*/
public EventLogImpl(EventLogConfiguration cfg) {
- this(cfg, SinkFactory.DEFAULT);
+ this(cfg, new LogSinkFactory(new
EventSerializerFactory().createEventSerializer()));
}
public EventLogImpl(EventLogConfiguration cfg, SinkFactory sinkFactory) {
@@ -55,7 +56,7 @@ public class EventLogImpl implements EventLog {
@Override
public void log(Supplier<Event> eventProvider) {
Event event = eventProvider.get();
- Set<EventChannel> channel =
channelRegistry.findAllChannelsByEventType(event.type());
+ Set<EventChannel> channel =
channelRegistry.findAllChannelsByEventType(event.getType());
channel.forEach(c -> c.log(event));
}
}
diff --git
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
index c2aa43ff0b..75df0614d1 100644
---
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
+++
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.eventlog.api.Event;
import org.apache.ignite.internal.eventlog.api.Sink;
import org.apache.ignite.internal.eventlog.config.schema.LogSinkView;
import org.apache.ignite.internal.eventlog.ser.EventSerializer;
-import org.apache.ignite.internal.eventlog.ser.JsonEventSerializer;
/** Sink that writes events to the log using any logging framework the user
has configured. */
class LogSink implements Sink {
@@ -31,10 +30,10 @@ class LogSink implements Sink {
private final EventSerializer serializer;
private final String level;
- LogSink(LogSinkView cfg) {
+ LogSink(LogSinkView cfg, EventSerializer eventSerializer) {
this.level = cfg.level();
this.logger = System.getLogger(cfg.criteria());
- this.serializer = new JsonEventSerializer();
+ this.serializer = eventSerializer;
}
/** {@inheritDoc} */
diff --git
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSinkFactory.java
similarity index 73%
copy from
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
copy to
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSinkFactory.java
index 84650e68fb..3a458ab06b 100644
---
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
+++
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSinkFactory.java
@@ -20,14 +20,17 @@ package org.apache.ignite.internal.eventlog.impl;
import org.apache.ignite.internal.eventlog.api.Sink;
import org.apache.ignite.internal.eventlog.config.schema.LogSinkView;
import org.apache.ignite.internal.eventlog.config.schema.SinkView;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.internal.eventlog.ser.EventSerializer;
/**
* Factory for creating sink instances.
*/
-interface SinkFactory {
- SinkFactory DEFAULT = new SinkFactory() {};
+class LogSinkFactory implements SinkFactory {
+ private final EventSerializer eventSerializer;
+
+ LogSinkFactory(EventSerializer eventSerializer) {
+ this.eventSerializer = eventSerializer;
+ }
/**
* Creates a sink instance.
@@ -35,11 +38,12 @@ interface SinkFactory {
* @param sinkView Sink configuration view.
* @return Sink instance.
*/
- default Sink createSink(SinkView sinkView) {
+ @Override
+ public Sink createSink(SinkView sinkView) {
if (sinkView instanceof LogSinkView) {
- return new LogSink((LogSinkView) sinkView);
+ return new LogSink((LogSinkView) sinkView, eventSerializer);
}
- throw new IgniteInternalException(Common.INTERNAL_ERR, "Unsupported
sink type: " + sinkView.type());
+ return SinkFactory.super.createSink(sinkView);
}
}
diff --git
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
index 84650e68fb..e967122956 100644
---
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
+++
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.eventlog.impl;
import org.apache.ignite.internal.eventlog.api.Sink;
-import org.apache.ignite.internal.eventlog.config.schema.LogSinkView;
import org.apache.ignite.internal.eventlog.config.schema.SinkView;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.lang.ErrorGroups.Common;
@@ -26,9 +25,7 @@ import org.apache.ignite.lang.ErrorGroups.Common;
/**
* Factory for creating sink instances.
*/
-interface SinkFactory {
- SinkFactory DEFAULT = new SinkFactory() {};
-
+public interface SinkFactory {
/**
* Creates a sink instance.
*
@@ -36,10 +33,6 @@ interface SinkFactory {
* @return Sink instance.
*/
default Sink createSink(SinkView sinkView) {
- if (sinkView instanceof LogSinkView) {
- return new LogSink((LogSinkView) sinkView);
- }
-
throw new IgniteInternalException(Common.INTERNAL_ERR, "Unsupported
sink type: " + sinkView.type());
}
}
diff --git
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/EventSerializerFactory.java
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/EventSerializerFactory.java
new file mode 100644
index 0000000000..6bed8d9e21
--- /dev/null
+++
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/EventSerializerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.ser;
+
+/**
+ * A factory to create a serializer useful for sink to write the event.
+ */
+public class EventSerializerFactory {
+ /**
+ * Creates a serializer which can be used in a sink to serialize all kinds
of events that were registered.
+ */
+ public EventSerializer createEventSerializer() {
+ return new JacksonBasedJsonSerializer();
+ }
+}
diff --git
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializer.java
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializer.java
similarity index 64%
rename from
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializer.java
rename to
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializer.java
index fb543dd1c6..1543ed73a5 100644
---
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializer.java
+++
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializer.java
@@ -18,28 +18,31 @@
package org.apache.ignite.internal.eventlog.ser;
import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import java.io.IOException;
import org.apache.ignite.internal.eventlog.api.Event;
-import org.apache.ignite.internal.eventlog.event.EventImpl;
import org.apache.ignite.internal.eventlog.event.EventUser;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.lang.ErrorGroups.Common;
-/** Serializes events to JSON. */
-public class JsonEventSerializer implements EventSerializer {
+/** Serializes events to JSON. Uses provided json serializer to serialize
events of specified class. */
+public class JacksonBasedJsonSerializer implements EventSerializer {
private final ObjectMapper mapper;
/** Default constructor. */
- public JsonEventSerializer() {
+ public JacksonBasedJsonSerializer() {
mapper = new ObjectMapper();
- SimpleModule module = new SimpleModule();
- module.addSerializer(EventImpl.class, new
EventImplJacksonSerializer());
+ mapper.registerModule(eventUserModule());
+ }
+
+ private static Module eventUserModule() {
+ SimpleModule module = new SimpleModule("EventUser");
module.addSerializer(EventUser.class, new
EventUserJacksonSerializer());
- mapper.registerModule(module);
+ return module;
}
/** {@inheritDoc} */
@@ -52,37 +55,7 @@ public class JsonEventSerializer implements EventSerializer {
}
}
- static class EventImplJacksonSerializer extends StdSerializer<EventImpl> {
- private EventImpl value;
- private JsonGenerator jgen;
- private SerializerProvider provider;
-
- EventImplJacksonSerializer() {
- this(null);
- }
-
- EventImplJacksonSerializer(Class<EventImpl> e) {
- super(e);
- }
-
- @Override
- public void serialize(EventImpl value, JsonGenerator jgen,
SerializerProvider provider) throws IOException {
- this.value = value;
- this.jgen = jgen;
- this.provider = provider;
-
- jgen.writeStartObject();
- jgen.writeStringField("type", value.type());
- jgen.writeNumberField("timestamp", value.timestamp());
- jgen.writeStringField("productVersion", value.productVersion());
- jgen.writeObjectField("user", value.user());
- jgen.writeObjectField("fields", value.fields());
- jgen.writeEndObject();
- }
- }
-
-
- static class EventUserJacksonSerializer extends StdSerializer<EventUser> {
+ private static class EventUserJacksonSerializer extends
StdSerializer<EventUser> {
private EventUser value;
private JsonGenerator jgen;
private SerializerProvider provider;
@@ -107,4 +80,5 @@ public class JsonEventSerializer implements EventSerializer {
jgen.writeEndObject();
}
}
+
}
diff --git
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/EventBuilderTest.java
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/EventBuilderTest.java
index bd1ebc950c..44556ae0c0 100644
---
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/EventBuilderTest.java
+++
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/EventBuilderTest.java
@@ -52,11 +52,11 @@ class EventBuilderTest {
.fields(Map.of("key", "value"))
.build();
- assertEquals(EVENT_TYPE, event.type());
- assertEquals(1, event.timestamp());
- assertEquals("1.1.1", event.productVersion());
- assertEquals(EventUser.system(), event.user());
- assertEquals(Map.of("key", "value"), event.fields());
+ assertEquals(EVENT_TYPE, event.getType());
+ assertEquals(1, event.getTimestamp());
+ assertEquals("1.1.1", event.getProductVersion());
+ assertEquals(EventUser.system(), event.getUser());
+ assertEquals(Map.of("key", "value"), event.getFields());
}
@Test
@@ -68,11 +68,11 @@ class EventBuilderTest {
.user(EventUser.system())
.build();
- assertEquals(EVENT_TYPE, event.type());
- assertEquals(1, event.timestamp());
- assertEquals("1.1.1", event.productVersion());
- assertEquals(EventUser.system(), event.user());
- assertEquals(Map.of(), event.fields());
+ assertEquals(EVENT_TYPE, event.getType());
+ assertEquals(1, event.getTimestamp());
+ assertEquals("1.1.1", event.getProductVersion());
+ assertEquals(EventUser.system(), event.getUser());
+ assertEquals(Map.of(), event.getFields());
}
@Test
@@ -82,11 +82,11 @@ class EventBuilderTest {
.user(EventUser.system())
.build();
- assertEquals(EVENT_TYPE, event.type());
- assertThat(event.timestamp(), greaterThan(0L));
- assertEquals("3.0.0", event.productVersion());
- assertEquals(EventUser.system(), event.user());
- assertEquals(Map.of(), event.fields());
+ assertEquals(EVENT_TYPE, event.getType());
+ assertThat(event.getTimestamp(), greaterThan(0L));
+ assertEquals("3.0.0", event.getProductVersion());
+ assertEquals(EventUser.system(), event.getUser());
+ assertEquals(Map.of(), event.getFields());
}
@Test
diff --git
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
index c1749521f8..7819288b34 100644
---
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
+++
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
@@ -42,7 +42,7 @@ class IgniteEventsTest {
Event.builder()
.type("CONNECTION_CLOSED")
.productVersion("3.0.0")
- .timestamp(connectionClosedEvent.timestamp())
+
.timestamp(connectionClosedEvent.getTimestamp())
.user(EventUser.of(USER, PROVIDER))
.build()
),
@@ -51,7 +51,7 @@ class IgniteEventsTest {
Event.builder()
.type("USER_AUTHENTICATED")
.productVersion("3.0.0")
-
.timestamp(connectionEstablishedEvent.timestamp())
+
.timestamp(connectionEstablishedEvent.getTimestamp())
.user(EventUser.of(USER, PROVIDER))
.build()
)
diff --git
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
index 2fdea06f9d..92b54bbfab 100644
---
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
+++
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
@@ -29,6 +29,7 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.eventlog.api.EventChannel;
import org.apache.ignite.internal.eventlog.api.IgniteEventType;
import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.eventlog.ser.EventSerializerFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -45,7 +46,10 @@ class ConfigurationBasedChannelRegistryTest extends
BaseIgniteAbstractTest {
@BeforeEach
void setUp() {
- registry = new ConfigurationBasedChannelRegistry(cfg, new
ConfigurationBasedSinkRegistry(cfg, SinkFactory.DEFAULT));
+ registry = new ConfigurationBasedChannelRegistry(cfg, new
ConfigurationBasedSinkRegistry(
+ cfg,
+ new LogSinkFactory(new
EventSerializerFactory().createEventSerializer()))
+ );
}
@Test
diff --git
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
index 768a41199d..d0feefd616 100644
---
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
+++
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.eventlog.ser.EventSerializerFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -43,7 +44,7 @@ class ConfigurationBasedSinkRegistryTest extends
BaseIgniteAbstractTest {
@BeforeEach
void setUp() {
- registry = new ConfigurationBasedSinkRegistry(cfg,
SinkFactory.DEFAULT);
+ registry = new ConfigurationBasedSinkRegistry(cfg, new
LogSinkFactory(new EventSerializerFactory().createEventSerializer()));
}
@Test
diff --git
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
index 54f9f0d217..f5b19ca59f 100644
---
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
+++
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
@@ -66,7 +66,7 @@ class EventLogTest {
// When add a channel but there is no sink.
channelRegistry.register(TEST_CHANNEL_NAME, () ->
channelFactory.createChannel(
- TEST_CHANNEL_NAME, Set.of(TEST_EVENT.type()))
+ TEST_CHANNEL_NAME, Set.of(TEST_EVENT.getType()))
);
// Then nothing thrown.
diff --git
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
index e74c593b71..9ddf6c1861 100644
---
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
+++
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.eventlog.api.Sink;
import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
import org.apache.ignite.internal.eventlog.config.schema.LogSinkChange;
import org.apache.ignite.internal.eventlog.event.EventUser;
+import org.apache.ignite.internal.eventlog.ser.EventSerializerFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -70,7 +71,8 @@ class LogSinkTest extends BaseIgniteAbstractTest {
logSinkChange.changeFormat("json");
})).get();
// And log sink.
- Sink logSink =
SinkFactory.DEFAULT.createSink(cfg.sinks().get("logSink").value());
+ Sink logSink = new LogSinkFactory(new
EventSerializerFactory().createEventSerializer())
+ .createSink(cfg.sinks().get("logSink").value());
// And event.
Event event = IgniteEvents.USER_AUTHENTICATED.create(
EventUser.of("user1", "basicProvider")
@@ -84,8 +86,8 @@ class LogSinkTest extends BaseIgniteAbstractTest {
// And event is written in JSON format.
var expectedEventJson = "{"
+ "\"type\":\"USER_AUTHENTICATED\","
- + "\"timestamp\":" + event.timestamp() + ","
- + "\"productVersion\":\"" + event.productVersion() + "\","
+ + "\"timestamp\":" + event.getTimestamp() + ","
+ + "\"productVersion\":\"" + event.getProductVersion() + "\","
+
"\"user\":{\"username\":\"user1\",\"authenticationProvider\":\"basicProvider\"},"
+ "\"fields\":{}"
+ "}";
diff --git
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEvent.java
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEvent.java
new file mode 100644
index 0000000000..52e6acd0ee
--- /dev/null
+++
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEvent.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.ser;
+
+import java.util.Map;
+import org.apache.ignite.internal.eventlog.api.Event;
+import org.apache.ignite.internal.eventlog.event.EventUser;
+
+class CustomEvent implements Event {
+ private final long timestamp;
+ private final String productVersion;
+ private final EventUser eventUser;
+ private final Message message;
+
+ CustomEvent(long timestamp, String productVersion, EventUser eventUser,
Message message) {
+ this.timestamp = timestamp;
+ this.productVersion = productVersion;
+ this.eventUser = eventUser;
+ this.message = message;
+ }
+
+ @Override
+ public String getType() {
+ return "CUSTOM";
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public String getProductVersion() {
+ return productVersion;
+ }
+
+ @Override
+ public EventUser getUser() {
+ return eventUser;
+ }
+
+ @Override
+ public Map<String, Object> getFields() {
+ return Map.of(
+ "hasMessage", message != null
+ );
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+}
diff --git
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEventBuilder.java
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEventBuilder.java
new file mode 100644
index 0000000000..84c7438ae4
--- /dev/null
+++
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEventBuilder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.ser;
+
+import org.apache.ignite.internal.eventlog.event.EventUser;
+
+class CustomEventBuilder {
+ private long timestamp;
+ private String productVersion;
+ private EventUser eventUser;
+ private Message message;
+
+ public CustomEventBuilder timestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public CustomEventBuilder productVersion(String productVersion) {
+ this.productVersion = productVersion;
+ return this;
+ }
+
+ public CustomEventBuilder user(EventUser eventUser) {
+ this.eventUser = eventUser;
+ return this;
+ }
+
+ public CustomEventBuilder message(Message message) {
+ this.message = message;
+ return this;
+ }
+
+ public CustomEvent build() {
+ return new CustomEvent(timestamp, productVersion, eventUser, message);
+ }
+}
diff --git
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializerTest.java
similarity index 66%
rename from
modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java
rename to
modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializerTest.java
index 9473db1c80..5615536041 100644
---
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java
+++
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializerTest.java
@@ -30,7 +30,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-class JsonEventSerializerTest {
+class JacksonBasedJsonSerializerTest {
private static Stream<Arguments> events() {
return Stream.of(
Arguments.of(
@@ -72,15 +72,48 @@ class JsonEventSerializerTest {
+
"\"user\":{\"username\":\"test_user\",\"authenticationProvider\":\"test_provider\"},"
+
"\"fields\":{\"id\":\"123\",\"ip\":\"127.0.0.1\"}"
+ "}"
+ ),
+ Arguments.of(
+ new CustomEventBuilder()
+ .productVersion("3.0.0")
+ .timestamp(1234567890)
+ .user(EventUser.of("test_user",
"test_provider"))
+ .message(new Message(1, "test"))
+ .build(),
+ "{\"type\":\"CUSTOM\","
+ + "\"timestamp\":1234567890,"
+ + "\"productVersion\":\"3.0.0\","
+ +
"\"user\":{\"username\":\"test_user\",\"authenticationProvider\":\"test_provider\"},"
+ +
"\"message\":{\"version\":1,\"body\":\"test\"},"
+ + "\"fields\":{\"hasMessage\":true}"
+ + "}"
+ ),
+ Arguments.of(
+ IgniteEvents.USER_AUTHENTICATED.builder()
+ .productVersion("3.0.0")
+ .timestamp(1234567890)
+ .user(EventUser.of("test_user",
"test_provider"))
+ .fields(Map.of(
+ "ip", "127.0.0.1",
+ "id", "123",
+ "message", new Message(1, "foo")
+ ))
+ .build(),
+ "{\"type\":\"USER_AUTHENTICATED\","
+ + "\"timestamp\":1234567890,"
+ + "\"productVersion\":\"3.0.0\","
+ +
"\"user\":{\"username\":\"test_user\",\"authenticationProvider\":\"test_provider\"},"
+ +
"\"fields\":{\"id\":\"123\",\"ip\":\"127.0.0.1\",\"message\":{\"version\":1,\"body\":\"foo\"}}"
+ + "}"
)
);
}
- private JsonEventSerializer serializer;
+ private JacksonBasedJsonSerializer serializer;
@BeforeEach
void setUp() {
- serializer = new JsonEventSerializer();
+ serializer = new JacksonBasedJsonSerializer();
}
@ParameterizedTest
diff --git
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/Message.java
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/Message.java
new file mode 100644
index 0000000000..b1fa183e32
--- /dev/null
+++
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/Message.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.ser;
+
+class Message {
+ private final int version;
+ private final String body;
+
+ public Message(int version, String body) {
+ this.version = version;
+ this.body = body;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public String getBody() {
+ return body;
+ }
+}