showuon commented on code in PR #12103:
URL: https://github.com/apache/kafka/pull/12103#discussion_r953271661
##########
shell/src/test/java/org/apache/kafka/shell/MetadataNodeManagerTest.java:
##########
@@ -336,4 +403,43 @@ public void testProducerIdsRecord() {
11000 + "",
metadataNodeManager.getData().root().directory("producerIds").file("nextBlockStartId").contents());
}
+
+ @Test
+ public void
testAccessControlEntryRecordAndRemoveAccessControlEntryRecord() {
+ AccessControlEntryRecord record1 = new AccessControlEntryRecord()
+ .setId(Uuid.fromString("GcaQDl2UTsCNs1p9s37XkQ"))
Review Comment:
nit: since this is just for test, we can use `Uuid.ZERO_UUID` for testing,
to avoid random strings here.
##########
shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java:
##########
@@ -333,6 +357,36 @@ private void handleCommitImpl(MetadataRecordType type,
ApiMessage message)
producerIds.create("nextBlockStartId").setContents(record.nextProducerId() +
"");
break;
}
+ case ACCESS_CONTROL_ENTRY_RECORD: {
+ AccessControlEntryRecord record = (AccessControlEntryRecord)
message;
+ DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
+ FileNode file = acls.create(record.id().toString());
+
file.setContents(AccessControlEntryRecordJsonConverter.write(record,
+
AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
+ break;
+ }
+ case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: {
+ RemoveAccessControlEntryRecord record =
(RemoveAccessControlEntryRecord) message;
+ DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
+ acls.rmrf(record.id().toString());
+ break;
+ }
+ case FEATURE_LEVEL_RECORD: {
+ FeatureLevelRecord record = (FeatureLevelRecord) message;
+ DirectoryNode features = data.root.mkdirs("features");
+ if (record.featureLevel() == 0) {
+ features.rmrf(record.name());
Review Comment:
Sorry, could you let me know where we mentioned `featureLevel == 0` means
delete it? I checked the record schema, it didn't mention this:
https://github.com/apache/kafka/blob/4b310d1fe171d6d5edda13bb1baf2cf5a0d8eb68/metadata/src/main/resources/common/metadata/FeatureLevelRecord.json#L17-L26
##########
shell/src/main/java/org/apache/kafka/shell/MetadataNodeManager.java:
##########
@@ -333,6 +357,36 @@ private void handleCommitImpl(MetadataRecordType type,
ApiMessage message)
producerIds.create("nextBlockStartId").setContents(record.nextProducerId() +
"");
break;
}
+ case ACCESS_CONTROL_ENTRY_RECORD: {
+ AccessControlEntryRecord record = (AccessControlEntryRecord)
message;
+ DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
+ FileNode file = acls.create(record.id().toString());
+
file.setContents(AccessControlEntryRecordJsonConverter.write(record,
+
AccessControlEntryRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
+ break;
+ }
+ case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: {
+ RemoveAccessControlEntryRecord record =
(RemoveAccessControlEntryRecord) message;
+ DirectoryNode acls = data.root.mkdirs("acl").mkdirs("id");
+ acls.rmrf(record.id().toString());
+ break;
+ }
+ case FEATURE_LEVEL_RECORD: {
+ FeatureLevelRecord record = (FeatureLevelRecord) message;
+ DirectoryNode features = data.root.mkdirs("features");
+ if (record.featureLevel() == 0) {
+ features.rmrf(record.name());
+ } else {
+ FileNode file = features.create(record.name());
+
file.setContents(FeatureLevelRecordJsonConverter.write(record,
+
FeatureLevelRecord.HIGHEST_SUPPORTED_VERSION).toPrettyString());
+ }
+
Review Comment:
nit: additional empty line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]