rondagostino commented on code in PR #13686: URL: https://github.com/apache/kafka/pull/13686#discussion_r1190240987
########## metadata/src/main/java/org/apache/kafka/image/node/ProducerIdsImageNode.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.node; + +import org.apache.kafka.image.ProducerIdsImage; + +import java.util.Collection; +import java.util.Collections; + + +public class ProducerIdsImageNode implements MetadataNode { + /** + * The name of this node. + */ + public static final String NAME = "producerIds"; + + /** + * The producer IDs image. + */ + private final ProducerIdsImage image; + + public ProducerIdsImageNode(ProducerIdsImage image) { + this.image = image; + } + + @Override + public Collection<String> childNames() { + return Collections.singletonList("nextProducerId"); + } + + @Override + public MetadataNode child(String name) { + if (name.equals("nextProducerId")) { + return new MetadataLeafNode(image.highestSeenProducerId() + ""); Review Comment: Is the highest seen the next one, or is the highest seen 1 less than the next one? It seems this code as written is correct (i.e. the highest seen the next one) since the original `toString()` method was invoking `"ProducerIdsImage(highestSeenProducerId=" + nextProducerId + ")"`. But just want to confirm. ########## metadata/src/main/java/org/apache/kafka/image/node/MetadataImageNode.java: ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.node; + +import org.apache.kafka.image.MetadataImage; + +import java.util.Arrays; +import java.util.Collection; + + +public class MetadataImageNode implements MetadataNode { + /** + * The name of this node. + */ + public final static String NAME = "image"; + + /** + * The metadata image. + */ + private final MetadataImage image; + + public MetadataImageNode(MetadataImage image) { + this.image = image; + } + + public MetadataImage image() { + return image; + } + + @Override + public Collection<String> childNames() { + return Arrays.asList( + ProvenanceNode.NAME, + FeaturesImageNode.NAME, + ClusterImageNode.NAME, + TopicsImageNode.NAME, + ConfigurationsImageNode.NAME, + ClientQuotasImageNode.NAME, + ProducerIdsImageNode.NAME, + AclsImageNode.NAME, + ScramImageNode.NAME + ); + } + + @Override + public MetadataNode child(String name) { Review Comment: Since we have to keep these two methods in sync, is it worth setting up a static map and using that? Perhaps not since the child names have to be sorted... but figured I would raise it as a possibility. ########## metadata/src/main/java/org/apache/kafka/image/node/ClientQuotasImageNode.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.node; + +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.image.ClientQuotaImage; +import org.apache.kafka.image.ClientQuotasImage; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.kafka.common.quota.ClientQuotaEntity.CLIENT_ID; +import static org.apache.kafka.common.quota.ClientQuotaEntity.IP; +import static org.apache.kafka.common.quota.ClientQuotaEntity.USER; + + +public class ClientQuotasImageNode implements MetadataNode { + /** + * The name of this node. + */ + public static final String NAME = "clientQuotas"; + + /** + * The topics image. + */ + private final ClientQuotasImage image; + + public ClientQuotasImageNode(ClientQuotasImage image) { + this.image = image; + } + + @Override + public Collection<String> childNames() { + ArrayList<String> childNames = new ArrayList<>(); + for (ClientQuotaEntity entity : image.entities().keySet()) { + childNames.add(clientQuotaEntityToString(entity)); + } + return childNames; + } + + static String clientQuotaEntityToString(ClientQuotaEntity entity) { + if (entity.entries().isEmpty()) { + throw new RuntimeException("Invalid empty entity"); + } Review Comment: This and `decodeEntity()` below need to be exercised in a test. In fact, I think it would be good to exercise all of the updated `toString()` methods in a test. ########## metadata/src/main/java/org/apache/kafka/image/node/ScramCredentialDataNode.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.node; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.image.node.printer.MetadataNodePrinter; +import org.apache.kafka.metadata.ScramCredentialData; + + +public class ScramCredentialDataNode implements MetadataNode { + private final ScramCredentialData data; + + public ScramCredentialDataNode(ScramCredentialData data) { + this.data = data; + } + + @Override + public boolean isDirectory() { + return false; + } + + @Override + public void print(MetadataNodePrinter printer) { + StringBuilder bld = new StringBuilder(); + bld.append("ScramCredentialData"); + bld.append("(salt="); + if (printer.redactionCriteria().shouldRedactScram()) { + bld.append("[redacted]"); + } else { + bld.append(Bytes.wrap(data.salt()).toString()); + } + bld.append(", storedKey="); + if (printer.redactionCriteria().shouldRedactScram()) { + bld.append("[redacted]"); + } else { + bld.append(Bytes.wrap(data.storedKey()).toString()); + } + bld.append(", serverKey="); + if (printer.redactionCriteria().shouldRedactScram()) { + bld.append("[redacted]"); + } else { + bld.append(Bytes.wrap(data.serverKey()).toString()); + } + bld.append(", iterations="); + if (printer.redactionCriteria().shouldRedactScram()) { + bld.append("[redacted]"); Review Comment: Need a test to lock in this redaction ########## metadata/src/main/java/org/apache/kafka/image/node/ConfigurationImageNode.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.node; + +import org.apache.kafka.image.ConfigurationImage; +import org.apache.kafka.image.node.printer.MetadataNodePrinter; + +import java.util.Collection; + + +public class ConfigurationImageNode implements MetadataNode { + /** + * The configuration image for a specific resource. + */ + private final ConfigurationImage image; + + public ConfigurationImageNode(ConfigurationImage image) { + this.image = image; + } + + @Override + public Collection<String> childNames() { + return image.data().keySet(); + } + + @Override + public MetadataNode child(String name) { + String value = image.data().get(name); + if (value == null) return null; + return new MetadataNode() { + @Override + public boolean isDirectory() { + return false; + } + + @Override + public void print(MetadataNodePrinter printer) { + if (printer.redactionCriteria(). Review Comment: Need a test to lock in the fact that this redaction will occur. ########## metadata/src/main/java/org/apache/kafka/image/node/printer/MetadataNodeRedactionCriteria.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.node.printer; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.metadata.KafkaConfigSchema; + + +public interface MetadataNodeRedactionCriteria { + /** + * Returns true if SCRAM data should be redacted. + */ + boolean shouldRedactScram(); + + /** + * Returns true if a configuration should be redacted. + * + * @param type The configuration type. + * @param key The configuration key. + * + * @return True if the configuration should be redacted. + */ + boolean shouldRedactConfig(ConfigResource.Type type, String key); + + class Strict implements MetadataNodeRedactionCriteria { Review Comment: Can we write tests to ensure that `Strict`, `Disabled`, and `Normal` act as intended? ########## metadata/src/main/java/org/apache/kafka/image/node/ConfigurationsImageNode.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.node; + +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.image.ConfigurationImage; +import org.apache.kafka.image.ConfigurationsImage; + +import java.util.ArrayList; +import java.util.Collection; + + +public class ConfigurationsImageNode implements MetadataNode { + /** + * The name of this node. + */ + public final static String NAME = "configs"; + + /** + * The configurations image. + */ + private final ConfigurationsImage image; + + public ConfigurationsImageNode(ConfigurationsImage image) { + this.image = image; + } + + @Override + public Collection<String> childNames() { + ArrayList<String> childNames = new ArrayList<>(); + for (ConfigResource configResource : image.resourceData().keySet()) { + if (configResource.isDefault()) { + childNames.add(configResource.type().name()); + } else { + childNames.add(configResource.type().name() + "_" + configResource.name()); + } + } + return childNames; + } + + private static ConfigResource resourceFromName(String name) { Review Comment: Need test to cover this code path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org