This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new f68a6247c [#5551] feat(core): add topic pre-event to Gravitino event.
(#5614)
f68a6247c is described below
commit f68a6247c49eea70b00cbf9014a348a02f73b81a
Author: Edward Xu <[email protected]>
AuthorDate: Thu Nov 21 09:43:25 2024 +0800
[#5551] feat(core): add topic pre-event to Gravitino event. (#5614)
### What changes were proposed in this pull request?
Add pre-events for topic events, including `AlterTopicPreEvent`,
`CreateTopicPreEvent`, `DropTopicPreEvent`, `ListTopicPreEvent` and
`LoadTopicPreEvent`.
### Why are the changes needed?
Fix: #5551
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Related Unit Tests have been added.
---
.../gravitino/listener/TopicEventDispatcher.java | 14 ++++++-
.../listener/api/event/AlterTopicPreEvent.java | 45 ++++++++++++++++++++++
.../listener/api/event/CreateTopicPreEvent.java | 45 ++++++++++++++++++++++
.../listener/api/event/DropTopicPreEvent.java | 31 +++++++++++++++
.../listener/api/event/ListTopicPreEvent.java | 44 +++++++++++++++++++++
.../listener/api/event/LoadTopicPreEvent.java | 31 +++++++++++++++
.../listener/api/event/TopicPreEvent.java | 31 +++++++++++++++
.../listener/api/event/TestTopicEvent.java | 43 ++++++++++++++++++---
8 files changed, 278 insertions(+), 6 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/listener/TopicEventDispatcher.java
b/core/src/main/java/org/apache/gravitino/listener/TopicEventDispatcher.java
index 96416a533..eeaa1869a 100644
--- a/core/src/main/java/org/apache/gravitino/listener/TopicEventDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/listener/TopicEventDispatcher.java
@@ -27,14 +27,19 @@ import org.apache.gravitino.exceptions.NoSuchTopicException;
import org.apache.gravitino.exceptions.TopicAlreadyExistsException;
import org.apache.gravitino.listener.api.event.AlterTopicEvent;
import org.apache.gravitino.listener.api.event.AlterTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.AlterTopicPreEvent;
import org.apache.gravitino.listener.api.event.CreateTopicEvent;
import org.apache.gravitino.listener.api.event.CreateTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.CreateTopicPreEvent;
import org.apache.gravitino.listener.api.event.DropTopicEvent;
import org.apache.gravitino.listener.api.event.DropTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.DropTopicPreEvent;
import org.apache.gravitino.listener.api.event.ListTopicEvent;
import org.apache.gravitino.listener.api.event.ListTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.ListTopicPreEvent;
import org.apache.gravitino.listener.api.event.LoadTopicEvent;
import org.apache.gravitino.listener.api.event.LoadTopicFailureEvent;
+import org.apache.gravitino.listener.api.event.LoadTopicPreEvent;
import org.apache.gravitino.listener.api.info.TopicInfo;
import org.apache.gravitino.messaging.DataLayout;
import org.apache.gravitino.messaging.Topic;
@@ -66,6 +71,8 @@ public class TopicEventDispatcher implements TopicDispatcher {
@Override
public Topic alterTopic(NameIdentifier ident, TopicChange... changes)
throws NoSuchTopicException, IllegalArgumentException {
+ eventBus.dispatchEvent(
+ new AlterTopicPreEvent(PrincipalUtils.getCurrentUserName(), ident,
changes));
try {
Topic topic = dispatcher.alterTopic(ident, changes);
eventBus.dispatchEvent(
@@ -81,6 +88,7 @@ public class TopicEventDispatcher implements TopicDispatcher {
@Override
public boolean dropTopic(NameIdentifier ident) {
+ eventBus.dispatchEvent(new
DropTopicPreEvent(PrincipalUtils.getCurrentUserName(), ident));
try {
boolean isExists = dispatcher.dropTopic(ident);
eventBus.dispatchEvent(
@@ -95,6 +103,7 @@ public class TopicEventDispatcher implements TopicDispatcher
{
@Override
public NameIdentifier[] listTopics(Namespace namespace) throws
NoSuchTopicException {
+ eventBus.dispatchEvent(new
ListTopicPreEvent(PrincipalUtils.getCurrentUserName(), namespace));
try {
NameIdentifier[] nameIdentifiers = dispatcher.listTopics(namespace);
eventBus.dispatchEvent(new
ListTopicEvent(PrincipalUtils.getCurrentUserName(), namespace));
@@ -108,6 +117,7 @@ public class TopicEventDispatcher implements
TopicDispatcher {
@Override
public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
+ eventBus.dispatchEvent(new
LoadTopicPreEvent(PrincipalUtils.getCurrentUserName(), ident));
try {
Topic topic = dispatcher.loadTopic(ident);
eventBus.dispatchEvent(
@@ -129,13 +139,15 @@ public class TopicEventDispatcher implements
TopicDispatcher {
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String,
String> properties)
throws NoSuchTopicException, TopicAlreadyExistsException {
+ TopicInfo createTopicRequest = new TopicInfo(ident.name(), comment,
properties, null);
+ eventBus.dispatchEvent(
+ new CreateTopicPreEvent(PrincipalUtils.getCurrentUserName(), ident,
createTopicRequest));
try {
Topic topic = dispatcher.createTopic(ident, comment, dataLayout,
properties);
eventBus.dispatchEvent(
new CreateTopicEvent(PrincipalUtils.getCurrentUserName(), ident, new
TopicInfo(topic)));
return topic;
} catch (Exception e) {
- TopicInfo createTopicRequest = new TopicInfo(ident.name(), comment,
properties, null);
eventBus.dispatchEvent(
new CreateTopicFailureEvent(
PrincipalUtils.getCurrentUserName(), ident, e,
createTopicRequest));
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/AlterTopicPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/AlterTopicPreEvent.java
new file mode 100644
index 000000000..65f4d7034
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/AlterTopicPreEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.messaging.TopicChange;
+
+/** Represents an event triggered before altering a topic. */
+@DeveloperApi
+public class AlterTopicPreEvent extends TopicPreEvent {
+ private final TopicChange[] topicChanges;
+
+ public AlterTopicPreEvent(String user, NameIdentifier identifier,
TopicChange[] topicChanges) {
+ super(user, identifier);
+ this.topicChanges = topicChanges;
+ }
+
+ /**
+ * Retrieves the specific changes made to the topic during the alteration
process.
+ *
+ * @return An array of {@link TopicChange} objects detailing each
modification applied to the
+ * topic.
+ */
+ public TopicChange[] topicChanges() {
+ return topicChanges;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/CreateTopicPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/CreateTopicPreEvent.java
new file mode 100644
index 000000000..f07148d89
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/CreateTopicPreEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.listener.api.info.TopicInfo;
+
+/** Represents an event triggered before creating a topic. */
+@DeveloperApi
+public class CreateTopicPreEvent extends TopicPreEvent {
+ private final TopicInfo createTopicRequest;
+
+ public CreateTopicPreEvent(String user, NameIdentifier identifier, TopicInfo
createTopicRequest) {
+ super(user, identifier);
+ this.createTopicRequest = createTopicRequest;
+ }
+
+ /**
+ * Retrieves the creation topic request.
+ *
+ * @return A {@link TopicInfo} instance encapsulating the comprehensive
details of create topic
+ * request.
+ */
+ public TopicInfo createTopicRequest() {
+ return createTopicRequest;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/DropTopicPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/DropTopicPreEvent.java
new file mode 100644
index 000000000..1de057e81
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/DropTopicPreEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents an event triggered before dropping a topic. */
+@DeveloperApi
+public class DropTopicPreEvent extends TopicPreEvent {
+ public DropTopicPreEvent(String user, NameIdentifier identifier) {
+ super(user, identifier);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/ListTopicPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/ListTopicPreEvent.java
new file mode 100644
index 000000000..e0d844ada
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/ListTopicPreEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents an event triggered before listing of topics within a namespace.
*/
+@DeveloperApi
+public class ListTopicPreEvent extends TopicPreEvent {
+ private final Namespace namespace;
+
+ public ListTopicPreEvent(String user, Namespace namespace) {
+ super(user, NameIdentifier.of(namespace.levels()));
+ this.namespace = namespace;
+ }
+
+ /**
+ * Provides the namespace associated with this event.
+ *
+ * @return A {@link Namespace} instance from which topics were listed.
+ */
+ public Namespace namespace() {
+ return namespace;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/LoadTopicPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/LoadTopicPreEvent.java
new file mode 100644
index 000000000..7c83e025e
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/LoadTopicPreEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents an event triggered before loading a topic. */
+@DeveloperApi
+public class LoadTopicPreEvent extends TopicPreEvent {
+ public LoadTopicPreEvent(String user, NameIdentifier identifier) {
+ super(user, identifier);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/listener/api/event/TopicPreEvent.java
b/core/src/main/java/org/apache/gravitino/listener/api/event/TopicPreEvent.java
new file mode 100644
index 000000000..127367344
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/listener/api/event/TopicPreEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents a pre-event for topic operations. */
+@DeveloperApi
+public abstract class TopicPreEvent extends PreEvent {
+ protected TopicPreEvent(String user, NameIdentifier identifier) {
+ super(user, identifier);
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java
index 268c628c5..35a158cdd 100644
---
a/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java
+++
b/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java
@@ -65,17 +65,32 @@ public class TestTopicEvent {
void testCreateTopicEvent() {
NameIdentifier identifier = NameIdentifier.of("metalake", "catalog",
"topic");
dispatcher.createTopic(identifier, topic.comment(), null,
topic.properties());
- Event event = dummyEventListener.popPostEvent();
- Assertions.assertEquals(identifier, event.identifier());
- Assertions.assertEquals(CreateTopicEvent.class, event.getClass());
- TopicInfo topicInfo = ((CreateTopicEvent) event).createdTopicInfo();
- checkTopicInfo(topicInfo, topic);
+ {
+ PreEvent preEvent = dummyEventListener.popPreEvent();
+ Assertions.assertEquals(identifier, preEvent.identifier());
+ Assertions.assertEquals(CreateTopicPreEvent.class, preEvent.getClass());
+ TopicInfo topicInfo = ((CreateTopicPreEvent)
preEvent).createTopicRequest();
+ checkTopicInfo(topicInfo, topic);
+ }
+
+ {
+ Event event = dummyEventListener.popPostEvent();
+ Assertions.assertEquals(identifier, event.identifier());
+ Assertions.assertEquals(CreateTopicEvent.class, event.getClass());
+ TopicInfo topicInfo = ((CreateTopicEvent) event).createdTopicInfo();
+ checkTopicInfo(topicInfo, topic);
+ }
}
@Test
void testLoadTopicEvent() {
NameIdentifier identifier = NameIdentifier.of("metalake", "catalog",
"topic");
dispatcher.loadTopic(identifier);
+
+ PreEvent preEvent = dummyEventListener.popPreEvent();
+ Assertions.assertEquals(identifier, preEvent.identifier());
+ Assertions.assertEquals(LoadTopicPreEvent.class, preEvent.getClass());
+
Event event = dummyEventListener.popPostEvent();
Assertions.assertEquals(identifier, event.identifier());
Assertions.assertEquals(LoadTopicEvent.class, event.getClass());
@@ -88,6 +103,13 @@ public class TestTopicEvent {
NameIdentifier identifier = NameIdentifier.of("metalake", "catalog",
"topic");
TopicChange topicChange = TopicChange.setProperty("a", "b");
dispatcher.alterTopic(identifier, topicChange);
+
+ PreEvent preEvent = dummyEventListener.popPreEvent();
+ Assertions.assertEquals(identifier, preEvent.identifier());
+ Assertions.assertEquals(AlterTopicPreEvent.class, preEvent.getClass());
+ Assertions.assertEquals(1, ((AlterTopicPreEvent)
preEvent).topicChanges().length);
+ Assertions.assertEquals(topicChange, ((AlterTopicPreEvent)
preEvent).topicChanges()[0]);
+
Event event = dummyEventListener.popPostEvent();
Assertions.assertEquals(identifier, event.identifier());
Assertions.assertEquals(AlterTopicEvent.class, event.getClass());
@@ -101,6 +123,11 @@ public class TestTopicEvent {
void testDropTopicEvent() {
NameIdentifier identifier = NameIdentifier.of("metalake", "catalog",
"topic");
dispatcher.dropTopic(identifier);
+
+ PreEvent preEvent = dummyEventListener.popPreEvent();
+ Assertions.assertEquals(identifier, preEvent.identifier());
+ Assertions.assertEquals(DropTopicPreEvent.class, preEvent.getClass());
+
Event event = dummyEventListener.popPostEvent();
Assertions.assertEquals(identifier, event.identifier());
Assertions.assertEquals(DropTopicEvent.class, event.getClass());
@@ -111,6 +138,12 @@ public class TestTopicEvent {
void testListTopicEvent() {
Namespace namespace = Namespace.of("metalake", "catalog");
dispatcher.listTopics(namespace);
+
+ PreEvent preEvent = dummyEventListener.popPreEvent();
+ Assertions.assertEquals(namespace.toString(),
preEvent.identifier().toString());
+ Assertions.assertEquals(ListTopicPreEvent.class, preEvent.getClass());
+ Assertions.assertEquals(namespace, ((ListTopicPreEvent)
preEvent).namespace());
+
Event event = dummyEventListener.popPostEvent();
Assertions.assertEquals(namespace.toString(),
event.identifier().toString());
Assertions.assertEquals(ListTopicEvent.class, event.getClass());