mjsax commented on code in PR #14554: URL: https://github.com/apache/kafka/pull/14554#discussion_r1362568238
########## clients/src/main/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsRequest.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData; +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class GetTelemetrySubscriptionsRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder<GetTelemetrySubscriptionsRequest> { + + private final GetTelemetrySubscriptionsRequestData data; + + public Builder(GetTelemetrySubscriptionsRequestData data) { + this(data, false); + } + + public Builder(GetTelemetrySubscriptionsRequestData data, boolean enableUnstableLastVersion) { Review Comment: For my own education: why do we `enableUnstableLastVersion` flag and what does it exactly do? -- Looking into existing code, only a few existing request types use it, and it seems it always set to `false` in prod code anyway, and only set to `true` for some test code ########## clients/src/main/resources/common/message/GetTelemetrySubscriptionsRequest.json: ########## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ Review Comment: For my own education: This is used to generate the `XxxData` classes code? ########## clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java: ########## @@ -34,6 +34,7 @@ public void testDelayedAllocationSchemaDetection() { case EXPIRE_DELEGATION_TOKEN: case RENEW_DELEGATION_TOKEN: case ALTER_USER_SCRAM_CREDENTIALS: + case PUSH_TELEMETRY: Review Comment: For my own education: why do we need to add this here? Also, why don't we need to add the subscriptions-request? I have no idea what the comment `// verifies that schemas known to retain a reference to the underlying byte buffer are correctly detected.` means. ########## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.PushTelemetryRequestData; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Errors; + +import java.nio.ByteBuffer; + +public class PushTelemetryRequest extends AbstractRequest { + + public static class Builder extends AbstractRequest.Builder<PushTelemetryRequest> { + + private final PushTelemetryRequestData data; + + public Builder(PushTelemetryRequestData data) { + this(data, false); + } + + public Builder(PushTelemetryRequestData data, boolean enableUnstableLastVersion) { + super(ApiKeys.PUSH_TELEMETRY, enableUnstableLastVersion); + this.data = data; + } + + @Override + public PushTelemetryRequest build(short version) { + return new PushTelemetryRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + private final PushTelemetryRequestData data; + + public PushTelemetryRequest(PushTelemetryRequestData data, short version) { + super(ApiKeys.PUSH_TELEMETRY, version); + this.data = data; + } + + @Override + public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { + PushTelemetryResponseData responseData = new PushTelemetryResponseData() + .setErrorCode(Errors.forException(e).code()); + responseData.setThrottleTimeMs(throttleTimeMs); Review Comment: nit: should we chain this call? ########## clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponseTest.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class GetTelemetrySubscriptionsResponseTest { + + @Test + public void testErrorCountsReturnsNoneWhenNoErrors() { + GetTelemetrySubscriptionsResponseData data = new GetTelemetrySubscriptionsResponseData() + .setThrottleTimeMs(10) + .setErrorCode(Errors.NONE.code()); + GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(data); + assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts()); + } + + @Test + public void testErrorCountsReturnsOneError() { + GetTelemetrySubscriptionsResponseData data = new GetTelemetrySubscriptionsResponseData() + .setThrottleTimeMs(10) + .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()); + data.setErrorCode(Errors.INVALID_CONFIG.code()); + + GetTelemetrySubscriptionsResponse response = new GetTelemetrySubscriptionsResponse(data); + Map<Errors, Integer> errorCounts = response.errorCounts(); + assertEquals(1, errorCounts.size()); + assertEquals(1, errorCounts.get(Errors.INVALID_CONFIG)); Review Comment: Would it be simple to so a single `assertEquals` with an `Map expectedMap`? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6222,4 +6222,62 @@ class KafkaApisTest { assertEquals(expectedResponse, response.data) } + + @Test + def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = { + val data = new GetTelemetrySubscriptionsRequestData() + data.setClientInstanceId(Uuid.randomUuid()) + + val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build()) + createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + + val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) + assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) + } + + @Test + def testGetTelemetrySubscriptionsUnsupportedVersionForKRaftClusters(): Unit = { + val data = new GetTelemetrySubscriptionsRequestData() + data.setClientInstanceId(Uuid.ZERO_UUID) Review Comment: As above ########## clients/src/main/resources/common/message/GetTelemetrySubscriptionsRequest.json: ########## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 71, + "type": "request", + "listeners": ["broker", "controller"], Review Comment: For my own education: Why broker and controller? (Is this related to KRaft to distinguish both?) ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6222,4 +6222,62 @@ class KafkaApisTest { assertEquals(expectedResponse, response.data) } + + @Test + def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = { + val data = new GetTelemetrySubscriptionsRequestData() + data.setClientInstanceId(Uuid.randomUuid()) + + val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build()) + createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + + val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) + assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) Review Comment: Test name says `NotAllowedForZkCluster`, so why do we get `UNKNOWN_SERVER_ERROR` back? ########## clients/src/test/java/org/apache/kafka/common/requests/GetTelemetrySubscriptionsResponseTest.java: ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class GetTelemetrySubscriptionsResponseTest { + + @Test + public void testErrorCountsReturnsNoneWhenNoErrors() { + GetTelemetrySubscriptionsResponseData data = new GetTelemetrySubscriptionsResponseData() + .setThrottleTimeMs(10) Review Comment: Why do we set throttle time? Seem we don't use it in the test? ########## clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryResponseTest.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.util.Map; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class PushTelemetryResponseTest { + + @Test + public void testErrorCountsReturnsNoneWhenNoErrors() { + PushTelemetryResponseData data = new PushTelemetryResponseData() + .setThrottleTimeMs(60) + .setErrorCode(Errors.NONE.code()); + PushTelemetryResponse response = new PushTelemetryResponse(data); + assertEquals(Collections.singletonMap(Errors.NONE, 1), response.errorCounts()); + } + + @Test + public void testErrorCountsReturnsOneError() { + PushTelemetryResponseData data = new PushTelemetryResponseData() + .setThrottleTimeMs(10) + .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()); + data.setErrorCode(Errors.INVALID_CONFIG.code()); + + PushTelemetryResponse response = new PushTelemetryResponse(data); + Map<Errors, Integer> errorCounts = response.errorCounts(); + assertEquals(1, errorCounts.size()); + assertEquals(1, errorCounts.get(Errors.INVALID_CONFIG)); Review Comment: as above ########## clients/src/test/java/org/apache/kafka/common/requests/PushTelemetryResponseTest.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import java.util.Map; +import org.apache.kafka.common.message.PushTelemetryResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class PushTelemetryResponseTest { + + @Test + public void testErrorCountsReturnsNoneWhenNoErrors() { + PushTelemetryResponseData data = new PushTelemetryResponseData() + .setThrottleTimeMs(60) Review Comment: as above ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6222,4 +6222,62 @@ class KafkaApisTest { assertEquals(expectedResponse, response.data) } + + @Test + def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = { + val data = new GetTelemetrySubscriptionsRequestData() + data.setClientInstanceId(Uuid.randomUuid()) Review Comment: Do we even need to set the client instance id? The request with no ID set should also get rejected? Should we test both cases? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6222,4 +6222,62 @@ class KafkaApisTest { assertEquals(expectedResponse, response.data) } + + @Test + def testGetTelemetrySubscriptionsNotAllowedForZkClusters(): Unit = { + val data = new GetTelemetrySubscriptionsRequestData() + data.setClientInstanceId(Uuid.randomUuid()) + + val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(data, true).build()) + createKafkaApis(enableForwarding = true).handle(request, RequestLocal.NoCaching) + + val response = verifyNoThrottling[GetTelemetrySubscriptionsResponse](request) + assertEquals(Errors.UNKNOWN_SERVER_ERROR, Errors.forCode(response.data.errorCode)) + } + + @Test + def testGetTelemetrySubscriptionsUnsupportedVersionForKRaftClusters(): Unit = { Review Comment: This test would change down the road, when broker side code is added to support the request? (We know get error back based on the placeholder code in `KafkaApi.scala`?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org