This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a3e8fd1266 IGNITE-21904 Mechanism to provide custom Event 
implementations to register EventSerializer (#3606)
a3e8fd1266 is described below

commit a3e8fd1266334aa09e3789f15db2262a60057578
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Tue Apr 16 12:29:06 2024 +0300

    IGNITE-21904 Mechanism to provide custom Event implementations to register 
EventSerializer (#3606)
---
 .../apache/ignite/internal/eventlog/api/Event.java | 15 +++--
 .../ignite/internal/eventlog/event/EventImpl.java  | 13 ++---
 .../internal/eventlog/impl/EventLogImpl.java       |  5 +-
 .../ignite/internal/eventlog/impl/LogSink.java     |  5 +-
 .../impl/{SinkFactory.java => LogSinkFactory.java} | 18 +++---
 .../ignite/internal/eventlog/impl/SinkFactory.java |  9 +--
 .../eventlog/ser/EventSerializerFactory.java       | 30 ++++++++++
 ...alizer.java => JacksonBasedJsonSerializer.java} | 50 ++++------------
 .../internal/eventlog/event/EventBuilderTest.java  | 30 +++++-----
 .../internal/eventlog/event/IgniteEventsTest.java  |  4 +-
 .../ConfigurationBasedChannelRegistryTest.java     |  6 +-
 .../impl/ConfigurationBasedSinkRegistryTest.java   |  3 +-
 .../internal/eventlog/impl/EventLogTest.java       |  2 +-
 .../ignite/internal/eventlog/impl/LogSinkTest.java |  8 ++-
 .../ignite/internal/eventlog/ser/CustomEvent.java  | 67 ++++++++++++++++++++++
 .../internal/eventlog/ser/CustomEventBuilder.java  | 51 ++++++++++++++++
 ...st.java => JacksonBasedJsonSerializerTest.java} | 39 ++++++++++++-
 .../ignite/internal/eventlog/ser/Message.java      | 36 ++++++++++++
 18 files changed, 294 insertions(+), 97 deletions(-)

diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
index 8e5f4f6f57..ccc7a78c02 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/Event.java
@@ -22,7 +22,10 @@ import 
org.apache.ignite.internal.eventlog.event.EventBuilder;
 import org.apache.ignite.internal.eventlog.event.EventTypeRegistry;
 import org.apache.ignite.internal.eventlog.event.EventUser;
 
-/** Represents an event object that can be logged to the event log. */
+/**
+ * Represents an event object that can be logged to the event log.
+ * All implementations of this interface must be plain POJO.
+ **/
 public interface Event {
     /** Default builder for the event object. */
     static EventBuilder builder() {
@@ -30,17 +33,17 @@ public interface Event {
     }
 
     /** The type of the event. The type must be registered in the {@link 
EventTypeRegistry}. */
-    String type();
+    String getType();
 
     /** The unix timestamp of the event. */
-    long timestamp();
+    long getTimestamp();
 
     /** The product version. The version is compatible with semver. */
-    String productVersion();
+    String getProductVersion();
 
     /** The user that caused the event. If the user is not available, the 
method returns a system user. */
-    EventUser user();
+    EventUser getUser();
 
     /** The event-specific fields of the event. */
-    Map<String, Object> fields();
+    Map<String, Object> getFields();
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
index c5691f2aeb..fa0918ae97 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/event/EventImpl.java
@@ -20,13 +20,12 @@ package org.apache.ignite.internal.eventlog.event;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.ignite.internal.eventlog.api.Event;
-import org.apache.ignite.internal.eventlog.ser.JsonEventSerializer;
 
 /**
  * Implementation of the {@link Event} interface. The class is immutable and 
thread-safe. If you want to create an instance of this class,
  * use the {@link EventBuilder}.
  *
- * <p>NOTE: If you rename/add any field in this class, you should also update 
the {@link JsonEventSerializer}.
+ * <p>NOTE: This class should always be a plain POJO.
  */
 public class EventImpl implements Event {
     private final String type;
@@ -48,27 +47,27 @@ public class EventImpl implements Event {
     }
 
     @Override
-    public String type() {
+    public String getType() {
         return type;
     }
 
     @Override
-    public long timestamp() {
+    public long getTimestamp() {
         return timestamp;
     }
 
     @Override
-    public String productVersion() {
+    public String getProductVersion() {
         return productVersion;
     }
 
     @Override
-    public EventUser user() {
+    public EventUser getUser() {
         return user;
     }
 
     @Override
-    public Map<String, Object> fields() {
+    public Map<String, Object> getFields() {
         return fields;
     }
 
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
index 487a43374b..16122366c5 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/EventLogImpl.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.eventlog.api.Event;
 import org.apache.ignite.internal.eventlog.api.EventChannel;
 import org.apache.ignite.internal.eventlog.api.EventLog;
 import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.eventlog.ser.EventSerializerFactory;
 
 /**
  * Implementation of the {@link EventLog} interface.
@@ -45,7 +46,7 @@ public class EventLogImpl implements EventLog {
      * @param cfg the configuration.
      */
     public EventLogImpl(EventLogConfiguration cfg) {
-        this(cfg, SinkFactory.DEFAULT);
+        this(cfg, new LogSinkFactory(new 
EventSerializerFactory().createEventSerializer()));
     }
 
     public EventLogImpl(EventLogConfiguration cfg, SinkFactory sinkFactory) {
@@ -55,7 +56,7 @@ public class EventLogImpl implements EventLog {
     @Override
     public void log(Supplier<Event> eventProvider) {
         Event event = eventProvider.get();
-        Set<EventChannel> channel = 
channelRegistry.findAllChannelsByEventType(event.type());
+        Set<EventChannel> channel = 
channelRegistry.findAllChannelsByEventType(event.getType());
         channel.forEach(c -> c.log(event));
     }
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
index c2aa43ff0b..75df0614d1 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSink.java
@@ -23,7 +23,6 @@ import org.apache.ignite.internal.eventlog.api.Event;
 import org.apache.ignite.internal.eventlog.api.Sink;
 import org.apache.ignite.internal.eventlog.config.schema.LogSinkView;
 import org.apache.ignite.internal.eventlog.ser.EventSerializer;
-import org.apache.ignite.internal.eventlog.ser.JsonEventSerializer;
 
 /** Sink that writes events to the log using any logging framework the user 
has configured. */
 class LogSink implements Sink {
@@ -31,10 +30,10 @@ class LogSink implements Sink {
     private final EventSerializer serializer;
     private final String level;
 
-    LogSink(LogSinkView cfg) {
+    LogSink(LogSinkView cfg, EventSerializer eventSerializer) {
         this.level = cfg.level();
         this.logger = System.getLogger(cfg.criteria());
-        this.serializer = new JsonEventSerializer();
+        this.serializer = eventSerializer;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSinkFactory.java
similarity index 73%
copy from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
copy to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSinkFactory.java
index 84650e68fb..3a458ab06b 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/LogSinkFactory.java
@@ -20,14 +20,17 @@ package org.apache.ignite.internal.eventlog.impl;
 import org.apache.ignite.internal.eventlog.api.Sink;
 import org.apache.ignite.internal.eventlog.config.schema.LogSinkView;
 import org.apache.ignite.internal.eventlog.config.schema.SinkView;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.internal.eventlog.ser.EventSerializer;
 
 /**
  * Factory for creating sink instances.
  */
-interface SinkFactory {
-    SinkFactory DEFAULT = new SinkFactory() {};
+class LogSinkFactory implements SinkFactory {
+    private final EventSerializer eventSerializer;
+
+    LogSinkFactory(EventSerializer eventSerializer) {
+        this.eventSerializer = eventSerializer;
+    }
 
     /**
      * Creates a sink instance.
@@ -35,11 +38,12 @@ interface SinkFactory {
      * @param sinkView Sink configuration view.
      * @return Sink instance.
      */
-    default Sink createSink(SinkView sinkView) {
+    @Override
+    public Sink createSink(SinkView sinkView) {
         if (sinkView instanceof LogSinkView) {
-            return new LogSink((LogSinkView) sinkView);
+            return new LogSink((LogSinkView) sinkView, eventSerializer);
         }
 
-        throw new IgniteInternalException(Common.INTERNAL_ERR, "Unsupported 
sink type: " + sinkView.type());
+        return SinkFactory.super.createSink(sinkView);
     }
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
index 84650e68fb..e967122956 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/impl/SinkFactory.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.eventlog.impl;
 
 import org.apache.ignite.internal.eventlog.api.Sink;
-import org.apache.ignite.internal.eventlog.config.schema.LogSinkView;
 import org.apache.ignite.internal.eventlog.config.schema.SinkView;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.lang.ErrorGroups.Common;
@@ -26,9 +25,7 @@ import org.apache.ignite.lang.ErrorGroups.Common;
 /**
  * Factory for creating sink instances.
  */
-interface SinkFactory {
-    SinkFactory DEFAULT = new SinkFactory() {};
-
+public interface SinkFactory {
     /**
      * Creates a sink instance.
      *
@@ -36,10 +33,6 @@ interface SinkFactory {
      * @return Sink instance.
      */
     default Sink createSink(SinkView sinkView) {
-        if (sinkView instanceof LogSinkView) {
-            return new LogSink((LogSinkView) sinkView);
-        }
-
         throw new IgniteInternalException(Common.INTERNAL_ERR, "Unsupported 
sink type: " + sinkView.type());
     }
 }
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/EventSerializerFactory.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/EventSerializerFactory.java
new file mode 100644
index 0000000000..6bed8d9e21
--- /dev/null
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/EventSerializerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.ser;
+
+/**
+ * A factory to create a serializer useful for sink to write the event.
+ */
+public class EventSerializerFactory {
+    /**
+     * Creates a serializer which can be used in a sink to serialize all kinds 
of events that were registered.
+     */
+    public EventSerializer createEventSerializer() {
+        return new JacksonBasedJsonSerializer();
+    }
+}
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializer.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializer.java
similarity index 64%
rename from 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializer.java
rename to 
modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializer.java
index fb543dd1c6..1543ed73a5 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializer.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializer.java
@@ -18,28 +18,31 @@
 package org.apache.ignite.internal.eventlog.ser;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializerProvider;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import java.io.IOException;
 import org.apache.ignite.internal.eventlog.api.Event;
-import org.apache.ignite.internal.eventlog.event.EventImpl;
 import org.apache.ignite.internal.eventlog.event.EventUser;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.lang.ErrorGroups.Common;
 
-/** Serializes events to JSON. */
-public class JsonEventSerializer implements EventSerializer {
+/** Serializes events to JSON. Uses provided json serializer to serialize 
events of specified class. */
+public class JacksonBasedJsonSerializer implements EventSerializer {
     private final ObjectMapper mapper;
 
     /** Default constructor. */
-    public JsonEventSerializer() {
+    public JacksonBasedJsonSerializer() {
         mapper = new ObjectMapper();
-        SimpleModule module = new SimpleModule();
-        module.addSerializer(EventImpl.class, new 
EventImplJacksonSerializer());
+        mapper.registerModule(eventUserModule());
+    }
+
+    private static Module eventUserModule() {
+        SimpleModule module = new SimpleModule("EventUser");
         module.addSerializer(EventUser.class, new 
EventUserJacksonSerializer());
-        mapper.registerModule(module);
+        return module;
     }
 
     /** {@inheritDoc} */
@@ -52,37 +55,7 @@ public class JsonEventSerializer implements EventSerializer {
         }
     }
 
-    static class EventImplJacksonSerializer extends StdSerializer<EventImpl> {
-        private EventImpl value;
-        private JsonGenerator jgen;
-        private SerializerProvider provider;
-
-        EventImplJacksonSerializer() {
-            this(null);
-        }
-
-        EventImplJacksonSerializer(Class<EventImpl> e) {
-            super(e);
-        }
-
-        @Override
-        public void serialize(EventImpl value, JsonGenerator jgen, 
SerializerProvider provider) throws IOException {
-            this.value = value;
-            this.jgen = jgen;
-            this.provider = provider;
-
-            jgen.writeStartObject();
-            jgen.writeStringField("type", value.type());
-            jgen.writeNumberField("timestamp", value.timestamp());
-            jgen.writeStringField("productVersion", value.productVersion());
-            jgen.writeObjectField("user", value.user());
-            jgen.writeObjectField("fields", value.fields());
-            jgen.writeEndObject();
-        }
-    }
-
-
-    static class EventUserJacksonSerializer extends StdSerializer<EventUser> {
+    private static class EventUserJacksonSerializer extends 
StdSerializer<EventUser> {
         private EventUser value;
         private JsonGenerator jgen;
         private SerializerProvider provider;
@@ -107,4 +80,5 @@ public class JsonEventSerializer implements EventSerializer {
             jgen.writeEndObject();
         }
     }
+
 }
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/EventBuilderTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/EventBuilderTest.java
index bd1ebc950c..44556ae0c0 100644
--- 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/EventBuilderTest.java
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/EventBuilderTest.java
@@ -52,11 +52,11 @@ class EventBuilderTest {
                 .fields(Map.of("key", "value"))
                 .build();
 
-        assertEquals(EVENT_TYPE, event.type());
-        assertEquals(1, event.timestamp());
-        assertEquals("1.1.1", event.productVersion());
-        assertEquals(EventUser.system(), event.user());
-        assertEquals(Map.of("key", "value"), event.fields());
+        assertEquals(EVENT_TYPE, event.getType());
+        assertEquals(1, event.getTimestamp());
+        assertEquals("1.1.1", event.getProductVersion());
+        assertEquals(EventUser.system(), event.getUser());
+        assertEquals(Map.of("key", "value"), event.getFields());
     }
 
     @Test
@@ -68,11 +68,11 @@ class EventBuilderTest {
                 .user(EventUser.system())
                 .build();
 
-        assertEquals(EVENT_TYPE, event.type());
-        assertEquals(1, event.timestamp());
-        assertEquals("1.1.1", event.productVersion());
-        assertEquals(EventUser.system(), event.user());
-        assertEquals(Map.of(), event.fields());
+        assertEquals(EVENT_TYPE, event.getType());
+        assertEquals(1, event.getTimestamp());
+        assertEquals("1.1.1", event.getProductVersion());
+        assertEquals(EventUser.system(), event.getUser());
+        assertEquals(Map.of(), event.getFields());
     }
 
     @Test
@@ -82,11 +82,11 @@ class EventBuilderTest {
                 .user(EventUser.system())
                 .build();
 
-        assertEquals(EVENT_TYPE, event.type());
-        assertThat(event.timestamp(), greaterThan(0L));
-        assertEquals("3.0.0", event.productVersion());
-        assertEquals(EventUser.system(), event.user());
-        assertEquals(Map.of(), event.fields());
+        assertEquals(EVENT_TYPE, event.getType());
+        assertThat(event.getTimestamp(), greaterThan(0L));
+        assertEquals("3.0.0", event.getProductVersion());
+        assertEquals(EventUser.system(), event.getUser());
+        assertEquals(Map.of(), event.getFields());
     }
 
     @Test
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
index c1749521f8..7819288b34 100644
--- 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/event/IgniteEventsTest.java
@@ -42,7 +42,7 @@ class IgniteEventsTest {
                         Event.builder()
                                 .type("CONNECTION_CLOSED")
                                 .productVersion("3.0.0")
-                                .timestamp(connectionClosedEvent.timestamp())
+                                
.timestamp(connectionClosedEvent.getTimestamp())
                                 .user(EventUser.of(USER, PROVIDER))
                                 .build()
                 ),
@@ -51,7 +51,7 @@ class IgniteEventsTest {
                         Event.builder()
                                 .type("USER_AUTHENTICATED")
                                 .productVersion("3.0.0")
-                                
.timestamp(connectionEstablishedEvent.timestamp())
+                                
.timestamp(connectionEstablishedEvent.getTimestamp())
                                 .user(EventUser.of(USER, PROVIDER))
                                 .build()
                 )
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
index 2fdea06f9d..92b54bbfab 100644
--- 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedChannelRegistryTest.java
@@ -29,6 +29,7 @@ import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
 import org.apache.ignite.internal.eventlog.api.EventChannel;
 import org.apache.ignite.internal.eventlog.api.IgniteEventType;
 import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.eventlog.ser.EventSerializerFactory;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -45,7 +46,10 @@ class ConfigurationBasedChannelRegistryTest extends 
BaseIgniteAbstractTest {
 
     @BeforeEach
     void setUp() {
-        registry = new ConfigurationBasedChannelRegistry(cfg, new 
ConfigurationBasedSinkRegistry(cfg, SinkFactory.DEFAULT));
+        registry = new ConfigurationBasedChannelRegistry(cfg, new 
ConfigurationBasedSinkRegistry(
+                cfg,
+                new LogSinkFactory(new 
EventSerializerFactory().createEventSerializer()))
+        );
     }
 
     @Test
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
index 768a41199d..d0feefd616 100644
--- 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/ConfigurationBasedSinkRegistryTest.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
+import org.apache.ignite.internal.eventlog.ser.EventSerializerFactory;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,7 +44,7 @@ class ConfigurationBasedSinkRegistryTest extends 
BaseIgniteAbstractTest {
 
     @BeforeEach
     void setUp() {
-        registry = new ConfigurationBasedSinkRegistry(cfg, 
SinkFactory.DEFAULT);
+        registry = new ConfigurationBasedSinkRegistry(cfg, new 
LogSinkFactory(new EventSerializerFactory().createEventSerializer()));
     }
 
     @Test
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
index 54f9f0d217..f5b19ca59f 100644
--- 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/EventLogTest.java
@@ -66,7 +66,7 @@ class EventLogTest {
 
         // When add a channel but there is no sink.
         channelRegistry.register(TEST_CHANNEL_NAME, () -> 
channelFactory.createChannel(
-                TEST_CHANNEL_NAME, Set.of(TEST_EVENT.type()))
+                TEST_CHANNEL_NAME, Set.of(TEST_EVENT.getType()))
         );
 
         // Then nothing thrown.
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
index e74c593b71..9ddf6c1861 100644
--- 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/impl/LogSinkTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.eventlog.api.Sink;
 import org.apache.ignite.internal.eventlog.config.schema.EventLogConfiguration;
 import org.apache.ignite.internal.eventlog.config.schema.LogSinkChange;
 import org.apache.ignite.internal.eventlog.event.EventUser;
+import org.apache.ignite.internal.eventlog.ser.EventSerializerFactory;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -70,7 +71,8 @@ class LogSinkTest extends BaseIgniteAbstractTest {
             logSinkChange.changeFormat("json");
         })).get();
         // And log sink.
-        Sink logSink = 
SinkFactory.DEFAULT.createSink(cfg.sinks().get("logSink").value());
+        Sink logSink = new LogSinkFactory(new 
EventSerializerFactory().createEventSerializer())
+                .createSink(cfg.sinks().get("logSink").value());
         // And event.
         Event event = IgniteEvents.USER_AUTHENTICATED.create(
                 EventUser.of("user1", "basicProvider")
@@ -84,8 +86,8 @@ class LogSinkTest extends BaseIgniteAbstractTest {
         // And event is written in JSON format.
         var expectedEventJson = "{"
                 + "\"type\":\"USER_AUTHENTICATED\","
-                + "\"timestamp\":" + event.timestamp() + ","
-                + "\"productVersion\":\"" + event.productVersion() + "\","
+                + "\"timestamp\":" + event.getTimestamp() + ","
+                + "\"productVersion\":\"" + event.getProductVersion() + "\","
                 + 
"\"user\":{\"username\":\"user1\",\"authenticationProvider\":\"basicProvider\"},"
                 + "\"fields\":{}"
                 + "}";
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEvent.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEvent.java
new file mode 100644
index 0000000000..52e6acd0ee
--- /dev/null
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEvent.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.ser;
+
+import java.util.Map;
+import org.apache.ignite.internal.eventlog.api.Event;
+import org.apache.ignite.internal.eventlog.event.EventUser;
+
+class CustomEvent implements Event {
+    private final long timestamp;
+    private final String productVersion;
+    private final EventUser eventUser;
+    private final Message message;
+
+    CustomEvent(long timestamp, String productVersion, EventUser eventUser, 
Message message) {
+        this.timestamp = timestamp;
+        this.productVersion = productVersion;
+        this.eventUser = eventUser;
+        this.message = message;
+    }
+
+    @Override
+    public String getType() {
+        return "CUSTOM";
+    }
+
+    @Override
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public String getProductVersion() {
+        return productVersion;
+    }
+
+    @Override
+    public EventUser getUser() {
+        return eventUser;
+    }
+
+    @Override
+    public Map<String, Object> getFields() {
+        return Map.of(
+                "hasMessage", message != null
+        );
+    }
+
+    public Message getMessage() {
+        return message;
+    }
+}
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEventBuilder.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEventBuilder.java
new file mode 100644
index 0000000000..84c7438ae4
--- /dev/null
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/CustomEventBuilder.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.ser;
+
+import org.apache.ignite.internal.eventlog.event.EventUser;
+
+class CustomEventBuilder {
+    private long timestamp;
+    private String productVersion;
+    private EventUser eventUser;
+    private Message message;
+
+    public CustomEventBuilder timestamp(long timestamp) {
+        this.timestamp = timestamp;
+        return this;
+    }
+
+    public CustomEventBuilder productVersion(String productVersion) {
+        this.productVersion = productVersion;
+        return this;
+    }
+
+    public CustomEventBuilder user(EventUser eventUser) {
+        this.eventUser = eventUser;
+        return this;
+    }
+
+    public CustomEventBuilder message(Message message) {
+        this.message = message;
+        return this;
+    }
+
+    public CustomEvent build() {
+        return new CustomEvent(timestamp, productVersion, eventUser, message);
+    }
+}
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializerTest.java
similarity index 66%
rename from 
modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java
rename to 
modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializerTest.java
index 9473db1c80..5615536041 100644
--- 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JsonEventSerializerTest.java
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/JacksonBasedJsonSerializerTest.java
@@ -30,7 +30,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-class JsonEventSerializerTest {
+class JacksonBasedJsonSerializerTest {
     private static Stream<Arguments> events() {
         return Stream.of(
                 Arguments.of(
@@ -72,15 +72,48 @@ class JsonEventSerializerTest {
                                 + 
"\"user\":{\"username\":\"test_user\",\"authenticationProvider\":\"test_provider\"},"
                                 + 
"\"fields\":{\"id\":\"123\",\"ip\":\"127.0.0.1\"}"
                                 + "}"
+                ),
+                Arguments.of(
+                        new CustomEventBuilder()
+                                .productVersion("3.0.0")
+                                .timestamp(1234567890)
+                                .user(EventUser.of("test_user", 
"test_provider"))
+                                .message(new Message(1, "test"))
+                                .build(),
+                        "{\"type\":\"CUSTOM\","
+                                + "\"timestamp\":1234567890,"
+                                + "\"productVersion\":\"3.0.0\","
+                                + 
"\"user\":{\"username\":\"test_user\",\"authenticationProvider\":\"test_provider\"},"
+                                + 
"\"message\":{\"version\":1,\"body\":\"test\"},"
+                                + "\"fields\":{\"hasMessage\":true}"
+                                + "}"
+                ),
+                Arguments.of(
+                        IgniteEvents.USER_AUTHENTICATED.builder()
+                                .productVersion("3.0.0")
+                                .timestamp(1234567890)
+                                .user(EventUser.of("test_user", 
"test_provider"))
+                                .fields(Map.of(
+                                        "ip", "127.0.0.1",
+                                        "id", "123",
+                                        "message", new Message(1, "foo")
+                                ))
+                                .build(),
+                        "{\"type\":\"USER_AUTHENTICATED\","
+                                + "\"timestamp\":1234567890,"
+                                + "\"productVersion\":\"3.0.0\","
+                                + 
"\"user\":{\"username\":\"test_user\",\"authenticationProvider\":\"test_provider\"},"
+                                + 
"\"fields\":{\"id\":\"123\",\"ip\":\"127.0.0.1\",\"message\":{\"version\":1,\"body\":\"foo\"}}"
+                                + "}"
                 )
         );
     }
 
-    private JsonEventSerializer serializer;
+    private JacksonBasedJsonSerializer serializer;
 
     @BeforeEach
     void setUp() {
-        serializer = new JsonEventSerializer();
+        serializer = new JacksonBasedJsonSerializer();
     }
 
     @ParameterizedTest
diff --git 
a/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/Message.java
 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/Message.java
new file mode 100644
index 0000000000..b1fa183e32
--- /dev/null
+++ 
b/modules/eventlog/src/test/java/org/apache/ignite/internal/eventlog/ser/Message.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.eventlog.ser;
+
+class Message {
+    private final int version;
+    private final String body;
+
+    public Message(int version, String body) {
+        this.version = version;
+        this.body = body;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public String getBody() {
+        return body;
+    }
+}


Reply via email to