This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 0119e0e4c [#5294] Add basic topic commands to Gravitino CLI (#5331)
0119e0e4c is described below
commit 0119e0e4c81a4289edf330f942a7ab2d51eb9428
Author: Justin Mclean <[email protected]>
AuthorDate: Fri Nov 29 14:23:02 2024 +1100
[#5294] Add basic topic commands to Gravitino CLI (#5331)
### What changes were proposed in this pull request?
Add basic topic commands to list, create and delete topics.
### Why are the changes needed?
To extend the Gravitino CLI.
Fix: #5294
### Does this PR introduce _any_ user-facing change?
No, but it extends the Gravitiuno CLI.
### How was this patch tested?
Tested locally with a local instance of Apache Kafka.
---
.../org/apache/gravitino/cli/CommandEntities.java | 2 +
.../org/apache/gravitino/cli/ErrorMessages.java | 2 +
.../java/org/apache/gravitino/cli/FullName.java | 9 +
.../apache/gravitino/cli/GravitinoCommandLine.java | 33 +++-
.../org/apache/gravitino/cli/GravitinoOptions.java | 22 ++-
.../apache/gravitino/cli/TestableCommandLine.java | 48 ++++++
.../{ListIndexes.java => CreateTopic.java} | 67 +++----
.../apache/gravitino/cli/commands/DeleteTopic.java | 99 +++++++++++
.../apache/gravitino/cli/commands/ListIndexes.java | 1 +
.../commands/{ListIndexes.java => ListTopics.java} | 60 +++----
.../gravitino/cli/commands/TopicDetails.java | 91 ++++++++++
.../gravitino/cli/commands/UpdateTopicComment.java | 95 ++++++++++
.../apache/gravitino/cli/TestTopicCommands.java | 192 +++++++++++++++++++++
docs/cli.md | 40 ++++-
14 files changed, 684 insertions(+), 77 deletions(-)
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/CommandEntities.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/CommandEntities.java
index aaf10e8e1..dc0332029 100644
--- a/clients/cli/src/main/java/org/apache/gravitino/cli/CommandEntities.java
+++ b/clients/cli/src/main/java/org/apache/gravitino/cli/CommandEntities.java
@@ -34,6 +34,7 @@ public class CommandEntities {
public static final String USER = "user";
public static final String GROUP = "group";
public static final String TAG = "tag";
+ public static final String TOPIC = "topic";
public static final String FILESET = "fileset";
public static final String ROLE = "role";
@@ -48,6 +49,7 @@ public class CommandEntities {
VALID_ENTITIES.add(USER);
VALID_ENTITIES.add(GROUP);
VALID_ENTITIES.add(TAG);
+ VALID_ENTITIES.add(TOPIC);
VALID_ENTITIES.add(FILESET);
VALID_ENTITIES.add(ROLE);
}
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/ErrorMessages.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/ErrorMessages.java
index 873ded785..2a34e21cc 100644
--- a/clients/cli/src/main/java/org/apache/gravitino/cli/ErrorMessages.java
+++ b/clients/cli/src/main/java/org/apache/gravitino/cli/ErrorMessages.java
@@ -41,6 +41,8 @@ public class ErrorMessages {
public static final String MULTIPLE_TAG_COMMAND_ERROR =
"Error: The current command only supports one --tag option.";
public static final String TAG_EXISTS = "Tag already exists.";
+ public static final String UNKNOWN_TOPIC = "Unknown topic.";
+ public static final String TOPIC_EXISTS = "Topic already exists.";
public static final String UNKNOWN_FILESET = "Unknown fileset.";
public static final String FILESET_EXISTS = "Fileset already exists.";
public static final String TAG_EMPTY = "Error: Must configure --tag option.";
diff --git a/clients/cli/src/main/java/org/apache/gravitino/cli/FullName.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/FullName.java
index 3c53401ca..959282485 100644
--- a/clients/cli/src/main/java/org/apache/gravitino/cli/FullName.java
+++ b/clients/cli/src/main/java/org/apache/gravitino/cli/FullName.java
@@ -106,6 +106,15 @@ public class FullName {
return getNamePart(2);
}
+ /**
+ * Retrieves the topic name from the third part of the full name option.
+ *
+ * @return The topic name, or null if not found.
+ */
+ public String getTopicName() {
+ return getNamePart(2);
+ }
+
/**
* Retrieves the fileset name from the third part of the full name option.
*
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/GravitinoCommandLine.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/GravitinoCommandLine.java
index cc2147c02..3d7f9fb57 100644
---
a/clients/cli/src/main/java/org/apache/gravitino/cli/GravitinoCommandLine.java
+++
b/clients/cli/src/main/java/org/apache/gravitino/cli/GravitinoCommandLine.java
@@ -124,6 +124,8 @@ public class GravitinoCommandLine extends
TestableCommandLine {
handleCatalogCommand();
} else if (entity.equals(CommandEntities.METALAKE)) {
handleMetalakeCommand();
+ } else if (entity.equals(CommandEntities.TOPIC)) {
+ handleTopicCommand();
} else if (entity.equals(CommandEntities.FILESET)) {
handleFilesetCommand();
} else if (entity.equals(CommandEntities.USER)) {
@@ -492,7 +494,36 @@ public class GravitinoCommandLine extends
TestableCommandLine {
}
/**
- * Handles the command execution for filesets based on command type and the
command line options.
+ * Handles the command execution for topics based on command type and the
command line options.
+ */
+ private void handleTopicCommand() {
+ String url = getUrl();
+ FullName name = new FullName(line);
+ String metalake = name.getMetalakeName();
+ String catalog = name.getCatalogName();
+ String schema = name.getSchemaName();
+ String topic = name.getTopicName();
+
+ if (CommandActions.LIST.equals(command)) {
+ newListTopics(url, ignore, metalake, catalog, schema).handle();
+ } else if (CommandActions.DETAILS.equals(command)) {
+ newTopicDetails(url, ignore, metalake, catalog, schema, topic).handle();
+ } else if (CommandActions.CREATE.equals(command)) {
+ String comment = line.getOptionValue(GravitinoOptions.COMMENT);
+ newCreateTopic(url, ignore, metalake, catalog, schema, topic,
comment).handle();
+ } else if (CommandActions.DELETE.equals(command)) {
+ boolean force = line.hasOption(GravitinoOptions.FORCE);
+ newDeleteTopic(url, ignore, force, metalake, catalog, schema,
topic).handle();
+ } else if (CommandActions.UPDATE.equals(command)) {
+ if (line.hasOption(GravitinoOptions.COMMENT)) {
+ String comment = line.getOptionValue(GravitinoOptions.COMMENT);
+ newUpdateTopicComment(url, ignore, metalake, catalog, schema, topic,
comment).handle();
+ }
+ }
+ }
+
+ /**
+ * Handles the command execution for Filesets based on command type and the
command line options.
*/
private void handleFilesetCommand() {
String url = getUrl();
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/GravitinoOptions.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/GravitinoOptions.java
index ee3e09472..fa5ac5a52 100644
--- a/clients/cli/src/main/java/org/apache/gravitino/cli/GravitinoOptions.java
+++ b/clients/cli/src/main/java/org/apache/gravitino/cli/GravitinoOptions.java
@@ -43,8 +43,8 @@ public class GravitinoOptions {
public static final String OWNER = "owner";
public static final String ROLE = "role";
public static final String AUDIT = "audit";
- public static final String INDEX = "index";
public static final String FORCE = "force";
+ public static final String INDEX = "index";
public static final String DISTRIBUTION = "distribution";
public static final String PARTITION = "partition";
@@ -79,16 +79,14 @@ public class GravitinoOptions {
"z", PROVIDER, "provider one of hadoop, hive, mysql, postgres,
iceberg, kafka"));
options.addOption(createArgOption("l", USER, "user name"));
options.addOption(createArgOption("g", GROUP, "group name"));
- options.addOption(createArgOption("t", TAG, "tag name"));
- options.addOption(createSimpleOption("o", OWNER, "entity owner"));
+ options.addOption(createSimpleOption("o", OWNER, "display entity owner"));
options.addOption(createArgOption("r", ROLE, "role name"));
- // Properties option can have multiple values
- Option properties =
- Option.builder("p").longOpt(PROPERTIES).desc("property name/value
pairs").hasArgs().build();
- options.addOption(properties);
+ // Properties and tags can have multiple values
+ options.addOption(createArgsOption("p", PROPERTIES, "property name/value
pairs"));
+ options.addOption(createArgsOption("t", TAG, "tag name"));
- // Force delete entity and rename metalake operations
+ // Force delete entities and rename metalake operations
options.addOption(createSimpleOption("f", FORCE, "force operation"));
return options;
@@ -118,6 +116,14 @@ public class GravitinoOptions {
return new Option(shortName, longName, true, description);
}
+ /**
+ * Helper method to create an Option that requires multiple argument.
+ *
+ * @param shortName The option name as a single letter
+ * @param longName The long option name.
+ * @param description The option description.
+ * @return The Option object.
+ */
public Option createArgsOption(String shortName, String longName, String
description) {
// Support multiple arguments
return
Option.builder().option(shortName).longOpt(longName).hasArgs().desc(description).build();
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/TestableCommandLine.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/TestableCommandLine.java
index 1df28658b..8f610679c 100644
---
a/clients/cli/src/main/java/org/apache/gravitino/cli/TestableCommandLine.java
+++
b/clients/cli/src/main/java/org/apache/gravitino/cli/TestableCommandLine.java
@@ -33,6 +33,7 @@ import org.apache.gravitino.cli.commands.CreateMetalake;
import org.apache.gravitino.cli.commands.CreateRole;
import org.apache.gravitino.cli.commands.CreateSchema;
import org.apache.gravitino.cli.commands.CreateTag;
+import org.apache.gravitino.cli.commands.CreateTopic;
import org.apache.gravitino.cli.commands.CreateUser;
import org.apache.gravitino.cli.commands.DeleteCatalog;
import org.apache.gravitino.cli.commands.DeleteFileset;
@@ -42,6 +43,7 @@ import org.apache.gravitino.cli.commands.DeleteRole;
import org.apache.gravitino.cli.commands.DeleteSchema;
import org.apache.gravitino.cli.commands.DeleteTable;
import org.apache.gravitino.cli.commands.DeleteTag;
+import org.apache.gravitino.cli.commands.DeleteTopic;
import org.apache.gravitino.cli.commands.DeleteUser;
import org.apache.gravitino.cli.commands.FilesetDetails;
import org.apache.gravitino.cli.commands.GroupDetails;
@@ -60,6 +62,7 @@ import org.apache.gravitino.cli.commands.ListSchema;
import org.apache.gravitino.cli.commands.ListSchemaProperties;
import org.apache.gravitino.cli.commands.ListTables;
import org.apache.gravitino.cli.commands.ListTagProperties;
+import org.apache.gravitino.cli.commands.ListTopics;
import org.apache.gravitino.cli.commands.ListUsers;
import org.apache.gravitino.cli.commands.MetalakeAudit;
import org.apache.gravitino.cli.commands.MetalakeDetails;
@@ -85,6 +88,7 @@ import org.apache.gravitino.cli.commands.TableDistribution;
import org.apache.gravitino.cli.commands.TablePartition;
import org.apache.gravitino.cli.commands.TagDetails;
import org.apache.gravitino.cli.commands.TagEntity;
+import org.apache.gravitino.cli.commands.TopicDetails;
import org.apache.gravitino.cli.commands.UntagEntity;
import org.apache.gravitino.cli.commands.UpdateCatalogComment;
import org.apache.gravitino.cli.commands.UpdateCatalogName;
@@ -92,6 +96,7 @@ import
org.apache.gravitino.cli.commands.UpdateMetalakeComment;
import org.apache.gravitino.cli.commands.UpdateMetalakeName;
import org.apache.gravitino.cli.commands.UpdateTagComment;
import org.apache.gravitino.cli.commands.UpdateTagName;
+import org.apache.gravitino.cli.commands.UpdateTopicComment;
import org.apache.gravitino.cli.commands.UserDetails;
/*
@@ -447,6 +452,49 @@ public class TestableCommandLine {
return new OwnerDetails(url, ignore, metalake, entity, entityType);
}
+ protected ListTopics newListTopics(
+ String url, boolean ignore, String metalake, String catalog, String
schema) {
+ return new ListTopics(url, ignore, metalake, catalog, schema);
+ }
+
+ protected TopicDetails newTopicDetails(
+ String url, boolean ignore, String metalake, String catalog, String
schema, String topic) {
+ return new TopicDetails(url, ignore, metalake, catalog, schema, topic);
+ }
+
+ protected CreateTopic newCreateTopic(
+ String url,
+ boolean ignore,
+ String metalake,
+ String catalog,
+ String schema,
+ String topic,
+ String comment) {
+ return new CreateTopic(url, ignore, metalake, catalog, schema, topic,
comment);
+ }
+
+ protected DeleteTopic newDeleteTopic(
+ String url,
+ boolean ignore,
+ boolean force,
+ String metalake,
+ String catalog,
+ String schema,
+ String topic) {
+ return new DeleteTopic(url, ignore, force, metalake, catalog, schema,
topic);
+ }
+
+ protected UpdateTopicComment newUpdateTopicComment(
+ String url,
+ boolean ignore,
+ String metalake,
+ String catalog,
+ String schema,
+ String topic,
+ String comment) {
+ return new UpdateTopicComment(url, ignore, metalake, catalog, schema,
topic, comment);
+ }
+
protected FilesetDetails newFilesetDetails(
String url, boolean ignore, String metalake, String catalog, String
schema, String fileset) {
return new FilesetDetails(url, ignore, metalake, catalog, schema, fileset);
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListIndexes.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/CreateTopic.java
similarity index 50%
copy from
clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListIndexes.java
copy to
clients/cli/src/main/java/org/apache/gravitino/cli/commands/CreateTopic.java
index 330574985..955d1e289 100644
---
a/clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListIndexes.java
+++
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/CreateTopic.java
@@ -19,64 +19,71 @@
package org.apache.gravitino.cli.commands;
-import java.util.Arrays;
import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.cli.ErrorMessages;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.TopicAlreadyExistsException;
-/** Displays the index of a table. */
-public class ListIndexes extends TableCommand {
+/** Add a topic. */
+public class CreateTopic extends Command {
+ protected final String metalake;
+ protected final String catalog;
protected final String schema;
- protected final String table;
+ protected final String topic;
+ protected final String comment;
/**
- * Displays the index of a table.
+ * Add a topic.
*
* @param url The URL of the Gravitino server.
* @param ignoreVersions If true don't check the client/server versions
match.
* @param metalake The name of the metalake.
* @param catalog The name of the catalog.
* @param schema The name of the schema.
- * @param table The name of the table.
+ * @param topic The name of the topic.
+ * @param comment The comment for the topic.
*/
- public ListIndexes(
+ public CreateTopic(
String url,
boolean ignoreVersions,
String metalake,
String catalog,
String schema,
- String table) {
- super(url, ignoreVersions, metalake, catalog);
+ String topic,
+ String comment) {
+ super(url, ignoreVersions);
+ this.metalake = metalake;
+ this.catalog = catalog;
this.schema = schema;
- this.table = table;
+ this.topic = topic;
+ this.comment = comment;
}
- /** Displays the details of a table's index. */
+ /** Add a topic. */
+ @Override
public void handle() {
- Index[] indexes;
+ NameIdentifier name = NameIdentifier.of(schema, topic);
try {
- NameIdentifier name = NameIdentifier.of(schema, table);
- indexes = tableCatalog().loadTable(name).index();
+ GravitinoClient client = buildClient(metalake);
+ client.loadCatalog(catalog).asTopicCatalog().createTopic(name, comment,
null, null);
+ } catch (NoSuchMetalakeException err) {
+ System.err.println(ErrorMessages.UNKNOWN_METALAKE);
+ return;
+ } catch (NoSuchSchemaException err) {
+ System.err.println(ErrorMessages.UNKNOWN_SCHEMA);
+ return;
+ } catch (TopicAlreadyExistsException err) {
+ System.err.println(ErrorMessages.TOPIC_EXISTS);
+ return;
} catch (Exception exp) {
System.err.println(exp.getMessage());
return;
}
- StringBuilder all = new StringBuilder();
- for (Index index : indexes) {
- // Flatten nested field names into dot-separated strings (e.g., "a.b.c")
- Arrays.stream(index.fieldNames())
- // Convert nested fields to a single string
- .map(nestedFieldName -> String.join(".", nestedFieldName))
- .forEach(
- fieldName ->
- all.append(fieldName)
- .append(",")
- .append(index.name())
- .append(System.lineSeparator()));
- }
-
- System.out.print(all);
+ System.out.println(topic + " topic created.");
}
}
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/commands/DeleteTopic.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/DeleteTopic.java
new file mode 100644
index 000000000..1d1730652
--- /dev/null
+++
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/DeleteTopic.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cli.commands;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.cli.AreYouSure;
+import org.apache.gravitino.cli.ErrorMessages;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTopicException;
+
+/** Delete a topic. */
+public class DeleteTopic extends Command {
+
+ protected final String metalake;
+ protected final String catalog;
+ protected final String schema;
+ protected final String topic;
+ protected final boolean force;
+
+ /**
+ * Delete a topic.
+ *
+ * @param url The URL of the Gravitino server.
+ * @param ignoreVersions If true don't check the client/server versions
match.
+ * @param force Force operation.
+ * @param metalake The name of the metalake.
+ * @param catalog The name of the catalog.
+ * @param schema The name of the schema.
+ * @param topic The name of the topic.
+ */
+ public DeleteTopic(
+ String url,
+ boolean ignoreVersions,
+ boolean force,
+ String metalake,
+ String catalog,
+ String schema,
+ String topic) {
+ super(url, ignoreVersions);
+ this.force = force;
+ this.metalake = metalake;
+ this.catalog = catalog;
+ this.schema = schema;
+ this.topic = topic;
+ }
+
+ /** Delete a topic. */
+ @Override
+ public void handle() {
+ NameIdentifier name = NameIdentifier.of(schema, topic);
+ boolean deleted = false;
+
+ if (!AreYouSure.really(force)) {
+ return;
+ }
+
+ try {
+ GravitinoClient client = buildClient(metalake);
+ deleted = client.loadCatalog(catalog).asTopicCatalog().dropTopic(name);
+ } catch (NoSuchMetalakeException err) {
+ System.err.println(ErrorMessages.UNKNOWN_METALAKE);
+ return;
+ } catch (NoSuchSchemaException err) {
+ System.err.println(ErrorMessages.UNKNOWN_SCHEMA);
+ return;
+ } catch (NoSuchTopicException err) {
+ System.err.println(ErrorMessages.UNKNOWN_TOPIC);
+ return;
+ } catch (Exception exp) {
+ System.err.println(exp.getMessage());
+ return;
+ }
+
+ if (deleted) {
+ System.out.println(topic + " deleted.");
+ } else {
+ System.out.println(topic + " not deleted.");
+ }
+ }
+}
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListIndexes.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListIndexes.java
index 330574985..7178f8753 100644
---
a/clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListIndexes.java
+++
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListIndexes.java
@@ -52,6 +52,7 @@ public class ListIndexes extends TableCommand {
}
/** Displays the details of a table's index. */
+ @Override
public void handle() {
Index[] indexes;
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListIndexes.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListTopics.java
similarity index 53%
copy from
clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListIndexes.java
copy to
clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListTopics.java
index 330574985..46416ff38 100644
---
a/clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListIndexes.java
+++
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/ListTopics.java
@@ -19,64 +19,56 @@
package org.apache.gravitino.cli.commands;
+import com.google.common.base.Joiner;
import java.util.Arrays;
import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.cli.ErrorMessages;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
-/** Displays the index of a table. */
-public class ListIndexes extends TableCommand {
+/** List the topics. */
+public class ListTopics extends Command {
+ protected final String metalake;
+ protected final String catalog;
protected final String schema;
- protected final String table;
/**
- * Displays the index of a table.
+ * List the names of all topics in a schema.
*
* @param url The URL of the Gravitino server.
* @param ignoreVersions If true don't check the client/server versions
match.
* @param metalake The name of the metalake.
* @param catalog The name of the catalog.
* @param schema The name of the schema.
- * @param table The name of the table.
*/
- public ListIndexes(
- String url,
- boolean ignoreVersions,
- String metalake,
- String catalog,
- String schema,
- String table) {
- super(url, ignoreVersions, metalake, catalog);
+ public ListTopics(
+ String url, boolean ignoreVersions, String metalake, String catalog,
String schema) {
+ super(url, ignoreVersions);
+ this.metalake = metalake;
+ this.catalog = catalog;
this.schema = schema;
- this.table = table;
}
- /** Displays the details of a table's index. */
+ /** List the names of all topics in a schema. */
+ @Override
public void handle() {
- Index[] indexes;
+ NameIdentifier[] topics = new NameIdentifier[0];
+ Namespace name = Namespace.of(schema);
try {
- NameIdentifier name = NameIdentifier.of(schema, table);
- indexes = tableCatalog().loadTable(name).index();
+ GravitinoClient client = buildClient(metalake);
+ topics = client.loadCatalog(catalog).asTopicCatalog().listTopics(name);
+ } catch (NoSuchMetalakeException err) {
+ System.err.println(ErrorMessages.UNKNOWN_METALAKE);
+ return;
} catch (Exception exp) {
System.err.println(exp.getMessage());
return;
}
- StringBuilder all = new StringBuilder();
- for (Index index : indexes) {
- // Flatten nested field names into dot-separated strings (e.g., "a.b.c")
- Arrays.stream(index.fieldNames())
- // Convert nested fields to a single string
- .map(nestedFieldName -> String.join(".", nestedFieldName))
- .forEach(
- fieldName ->
- all.append(fieldName)
- .append(",")
- .append(index.name())
- .append(System.lineSeparator()));
- }
-
- System.out.print(all);
+ String all = Joiner.on(",").join(Arrays.stream(topics).map(topic ->
topic.name()).iterator());
+ System.out.println(all);
}
}
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/commands/TopicDetails.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/TopicDetails.java
new file mode 100644
index 000000000..0e73bb845
--- /dev/null
+++
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/TopicDetails.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cli.commands;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.cli.ErrorMessages;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTopicException;
+import org.apache.gravitino.messaging.Topic;
+
+/** Displays the details of a topic. */
+public class TopicDetails extends Command {
+
+ protected final String metalake;
+ protected final String catalog;
+ protected final String schema;
+ protected final String topic;
+
+ /**
+ * Displays the details of a topic.
+ *
+ * @param url The URL of the Gravitino server.
+ * @param ignoreVersions If true don't check the client/server versions
match.
+ * @param metalake The name of the metalake.
+ * @param catalog The name of the catalog.
+ * @param schema The name of the schenma.
+ * @param topic The name of the topic.
+ */
+ public TopicDetails(
+ String url,
+ boolean ignoreVersions,
+ String metalake,
+ String catalog,
+ String schema,
+ String topic) {
+ super(url, ignoreVersions);
+ this.metalake = metalake;
+ this.catalog = catalog;
+ this.schema = schema;
+ this.topic = topic;
+ }
+
+ /** Displays the details of a topic. */
+ @Override
+ public void handle() {
+ Topic gTopic = null;
+
+ try {
+ NameIdentifier name = NameIdentifier.of(schema, topic);
+ GravitinoClient client = buildClient(metalake);
+ gTopic = client.loadCatalog(catalog).asTopicCatalog().loadTopic(name);
+ } catch (NoSuchMetalakeException err) {
+ System.err.println(ErrorMessages.UNKNOWN_METALAKE);
+ return;
+ } catch (NoSuchCatalogException err) {
+ System.err.println(ErrorMessages.UNKNOWN_CATALOG);
+ return;
+ } catch (NoSuchSchemaException err) {
+ System.err.println(ErrorMessages.UNKNOWN_SCHEMA);
+ return;
+ } catch (NoSuchTopicException err) {
+ System.err.println(ErrorMessages.UNKNOWN_TOPIC);
+ return;
+ } catch (Exception exp) {
+ System.err.println(exp.getMessage());
+ return;
+ }
+
+ System.out.println(gTopic.name() + "," + gTopic.comment());
+ }
+}
diff --git
a/clients/cli/src/main/java/org/apache/gravitino/cli/commands/UpdateTopicComment.java
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/UpdateTopicComment.java
new file mode 100644
index 000000000..d57efcaf9
--- /dev/null
+++
b/clients/cli/src/main/java/org/apache/gravitino/cli/commands/UpdateTopicComment.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cli.commands;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.cli.ErrorMessages;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NoSuchTopicException;
+import org.apache.gravitino.messaging.TopicChange;
+
+/** Update the comment of a topic. */
+public class UpdateTopicComment extends Command {
+
+ protected final String metalake;
+ protected final String catalog;
+ protected final String schema;
+ protected final String topic;
+ protected final String comment;
+
+ /**
+ * Update the comment of a topic.
+ *
+ * @param url The URL of the Gravitino server.
+ * @param ignoreVersions If true don't check the client/server versions
match.
+ * @param metalake The name of the metalake.
+ * @param catalog The name of the catalog.
+ * @param schema The name of the schema.
+ * @param topic The name of the topic.
+ * @param comment New metalake comment.
+ */
+ public UpdateTopicComment(
+ String url,
+ boolean ignoreVersions,
+ String metalake,
+ String catalog,
+ String schema,
+ String topic,
+ String comment) {
+ super(url, ignoreVersions);
+ this.metalake = metalake;
+ this.catalog = catalog;
+ this.schema = schema;
+ this.topic = topic;
+ this.comment = comment;
+ }
+
+ /** Update the comment of a topic. */
+ @Override
+ public void handle() {
+ NameIdentifier name = NameIdentifier.of(schema, topic);
+
+ try {
+ GravitinoClient client = buildClient(metalake);
+ TopicChange change = TopicChange.updateComment(comment);
+ client.loadCatalog(catalog).asTopicCatalog().alterTopic(name, change);
+ } catch (NoSuchMetalakeException err) {
+ System.err.println(ErrorMessages.UNKNOWN_METALAKE);
+ return;
+ } catch (NoSuchCatalogException err) {
+ System.err.println(ErrorMessages.UNKNOWN_CATALOG);
+ return;
+ } catch (NoSuchSchemaException err) {
+ System.err.println(ErrorMessages.UNKNOWN_SCHEMA);
+ return;
+ } catch (NoSuchTopicException err) {
+ System.err.println(ErrorMessages.UNKNOWN_TOPIC);
+ return;
+ } catch (Exception exp) {
+ System.err.println(exp.getMessage());
+ return;
+ }
+
+ System.out.println(topic + " comment changed.");
+ }
+}
diff --git
a/clients/cli/src/test/java/org/apache/gravitino/cli/TestTopicCommands.java
b/clients/cli/src/test/java/org/apache/gravitino/cli/TestTopicCommands.java
new file mode 100644
index 000000000..9e69b09ec
--- /dev/null
+++ b/clients/cli/src/test/java/org/apache/gravitino/cli/TestTopicCommands.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.cli;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.gravitino.cli.commands.CreateTopic;
+import org.apache.gravitino.cli.commands.DeleteTopic;
+import org.apache.gravitino.cli.commands.ListTopics;
+import org.apache.gravitino.cli.commands.TopicDetails;
+import org.apache.gravitino.cli.commands.UpdateTopicComment;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TestTopicCommands {
+ private CommandLine mockCommandLine;
+ private Options mockOptions;
+
+ @BeforeEach
+ void setUp() {
+ mockCommandLine = mock(CommandLine.class);
+ mockOptions = mock(Options.class);
+ }
+
+ @Test
+ void testListTopicsCommand() {
+ ListTopics mockList = mock(ListTopics.class);
+
when(mockCommandLine.hasOption(GravitinoOptions.METALAKE)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(CommandEntities.METALAKE)).thenReturn("metalake_demo");
+ when(mockCommandLine.hasOption(GravitinoOptions.NAME)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.NAME)).thenReturn("catalog.schema");
+ GravitinoCommandLine commandLine =
+ spy(
+ new GravitinoCommandLine(
+ mockCommandLine, mockOptions, CommandEntities.TOPIC,
CommandActions.LIST));
+ doReturn(mockList)
+ .when(commandLine)
+ .newListTopics(
+ GravitinoCommandLine.DEFAULT_URL, false, "metalake_demo",
"catalog", "schema");
+ commandLine.handleCommandLine();
+ verify(mockList).handle();
+ }
+
+ @Test
+ void testTopicDetailsCommand() {
+ TopicDetails mockDetails = mock(TopicDetails.class);
+
when(mockCommandLine.hasOption(GravitinoOptions.METALAKE)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.METALAKE)).thenReturn("metalake_demo");
+ when(mockCommandLine.hasOption(GravitinoOptions.NAME)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.NAME)).thenReturn("catalog.schema.topic");
+ GravitinoCommandLine commandLine =
+ spy(
+ new GravitinoCommandLine(
+ mockCommandLine, mockOptions, CommandEntities.TOPIC,
CommandActions.DETAILS));
+ doReturn(mockDetails)
+ .when(commandLine)
+ .newTopicDetails(
+ GravitinoCommandLine.DEFAULT_URL, false, "metalake_demo",
"catalog", "schema", "topic");
+ commandLine.handleCommandLine();
+ verify(mockDetails).handle();
+ }
+
+ @Test
+ void testCreateTopicCommand() {
+ CreateTopic mockCreate = mock(CreateTopic.class);
+
+
when(mockCommandLine.hasOption(GravitinoOptions.METALAKE)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.METALAKE)).thenReturn("metalake_demo");
+ when(mockCommandLine.hasOption(GravitinoOptions.NAME)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.NAME)).thenReturn("catalog.schema.topic");
+ when(mockCommandLine.hasOption(GravitinoOptions.COMMENT)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.COMMENT)).thenReturn("comment");
+ GravitinoCommandLine commandLine =
+ spy(
+ new GravitinoCommandLine(
+ mockCommandLine, mockOptions, CommandEntities.TOPIC,
CommandActions.CREATE));
+ doReturn(mockCreate)
+ .when(commandLine)
+ .newCreateTopic(
+ GravitinoCommandLine.DEFAULT_URL,
+ false,
+ "metalake_demo",
+ "catalog",
+ "schema",
+ "topic",
+ "comment");
+ commandLine.handleCommandLine();
+ verify(mockCreate).handle();
+ }
+
+ @Test
+ void testDeleteTopicCommand() {
+ DeleteTopic mockDelete = mock(DeleteTopic.class);
+
when(mockCommandLine.hasOption(GravitinoOptions.METALAKE)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.METALAKE)).thenReturn("metalake_demo");
+ when(mockCommandLine.hasOption(GravitinoOptions.NAME)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.NAME)).thenReturn("catalog.schema.topic");
+ GravitinoCommandLine commandLine =
+ spy(
+ new GravitinoCommandLine(
+ mockCommandLine, mockOptions, CommandEntities.TOPIC,
CommandActions.DELETE));
+ doReturn(mockDelete)
+ .when(commandLine)
+ .newDeleteTopic(
+ GravitinoCommandLine.DEFAULT_URL,
+ false,
+ false,
+ "metalake_demo",
+ "catalog",
+ "schema",
+ "topic");
+ commandLine.handleCommandLine();
+ verify(mockDelete).handle();
+ }
+
+ @Test
+ void testDeleteTopicForceCommand() {
+ DeleteTopic mockDelete = mock(DeleteTopic.class);
+
when(mockCommandLine.hasOption(GravitinoOptions.METALAKE)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.METALAKE)).thenReturn("metalake_demo");
+ when(mockCommandLine.hasOption(GravitinoOptions.NAME)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.NAME)).thenReturn("catalog.schema.topic");
+ when(mockCommandLine.hasOption(GravitinoOptions.FORCE)).thenReturn(true);
+ GravitinoCommandLine commandLine =
+ spy(
+ new GravitinoCommandLine(
+ mockCommandLine, mockOptions, CommandEntities.TOPIC,
CommandActions.DELETE));
+ doReturn(mockDelete)
+ .when(commandLine)
+ .newDeleteTopic(
+ GravitinoCommandLine.DEFAULT_URL,
+ false,
+ true,
+ "metalake_demo",
+ "catalog",
+ "schema",
+ "topic");
+ commandLine.handleCommandLine();
+ verify(mockDelete).handle();
+ }
+
+ @Test
+ void testUpdateCommentTopicCommand() {
+ UpdateTopicComment mockUpdate = mock(UpdateTopicComment.class);
+
+
when(mockCommandLine.hasOption(GravitinoOptions.METALAKE)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.METALAKE)).thenReturn("metalake_demo");
+ when(mockCommandLine.hasOption(GravitinoOptions.NAME)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.NAME)).thenReturn("catalog.schema.topic");
+ when(mockCommandLine.hasOption(GravitinoOptions.COMMENT)).thenReturn(true);
+
when(mockCommandLine.getOptionValue(GravitinoOptions.COMMENT)).thenReturn("new
comment");
+ GravitinoCommandLine commandLine =
+ spy(
+ new GravitinoCommandLine(
+ mockCommandLine, mockOptions, CommandEntities.TOPIC,
CommandActions.UPDATE));
+ doReturn(mockUpdate)
+ .when(commandLine)
+ .newUpdateTopicComment(
+ GravitinoCommandLine.DEFAULT_URL,
+ false,
+ "metalake_demo",
+ "catalog",
+ "schema",
+ "topic",
+ "new comment");
+ commandLine.handleCommandLine();
+ verify(mockUpdate).handle();
+ }
+}
diff --git a/docs/cli.md b/docs/cli.md
index 856e591a4..15693bfc6 100644
--- a/docs/cli.md
+++ b/docs/cli.md
@@ -10,7 +10,7 @@ license: 'This software is licensed under the Apache License
version 2.'
This document provides guidance on managing metadata within Apache Gravitino
using the Command Line Interface (CLI). The CLI offers a terminal based
alternative to using code or the REST interface for metadata management.
-Currently, the CLI allows users to view metadata information for metalakes,
catalogs, schemas, tables, users groups and tags. Future updates will expand on
these capabilities to include roles, topics and filesets.
+Currently, the CLI allows users to view metadata information for metalakes,
catalogs, schemas, tables, users, roles, groups, tags and topics. Future
updates will expand on these capabilities.
## Running the CLI
@@ -28,7 +28,7 @@ The general structure for running commands with the Gravitino
CLI is `gcli entit
```bash
[options]
- usage: gcli [metalake|catalog|schema|table|column|user|group|tag|fileset]
[list|details|create|delete|update|set|remove|properties|revoke|grant] [options]
+ usage: gcli
[metalake|catalog|schema|table|column|user|group|tag|topic|fileset]
[list|details|create|delete|update|set|remove|properties|revoke|grant] [options]
Options
-a,--audit display audit information
-c,--comment <arg> entity comment
@@ -40,7 +40,7 @@ The general structure for running commands with the Gravitino
CLI is `gcli entit
-l,--user <arg> user name
-m,--metalake <arg> metalake name
-n,--name <arg> full entity name (dot separated)
- -o,--owner entity owner
+ -o,--owner display entity owner
-P,--property <arg> property name
-p,--properties <arg> property name/value pairs
--partition display partition information
@@ -451,7 +451,7 @@ gcli group list
```bash
gcli group delete --group new_group
- ```
+```
### Tag commands
@@ -596,6 +596,38 @@ gcli group grant --group groupA --role admin
gcli group revoke --group groupA --role admin
```
+### Topic commands
+
+#### Display a topic's details
+
+```bash
+gcli topic details --name kafka.default.topic3
+```
+
+#### Create a tag
+
+```bash
+gcli topic create --name kafka.default.topic3
+```
+
+#### List all topics
+
+```bash
+gcli topic list --name kafka.default
+```
+
+#### Delete a topic
+
+```bash
+gcli topic delete --name kafka.default.topic3
+```
+
+#### Change a topic's comment
+
+```bash
+gcli topic update --name kafka.default.topic3 --comment new_comment
+```
+
### Fileset commands
#### Create a fileset