mumrah commented on code in PR #13643: URL: https://github.com/apache/kafka/pull/13643#discussion_r1263851139
########## metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.errors; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; +import org.apache.kafka.server.mutable.BoundedListTooLongException; + +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; + + +public final class EventHandlerExceptionInfo { + /** + * True if this exception should be treated as a fault. Review Comment: Should we mention that this will increment the error metric? ########## metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.errors; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; +import org.apache.kafka.server.mutable.BoundedListTooLongException; + +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; + + +public final class EventHandlerExceptionInfo { + /** + * True if this exception should be treated as a fault. + */ + private final boolean isFault; + + /** + * True if this exception should cause a controller failover. + * All faults cause failover + */ + private final boolean causesFailover; + + /** + * The internal exception. + */ + private final Throwable internalException; + + /** + * The exception to present to RPC callers, or Optional.empty if the internal exception should + * be presented directly. + */ + private final Optional<Throwable> externalException; + + /** + * Create an EventHandlerExceptionInfo object from an internal exception. + * + * @param internal The internal exception. + * @param latestControllerSupplier A function we can call to obtain the latest leader id. + * + * @return The new immutable info object. + */ + public static EventHandlerExceptionInfo fromInternal( + Throwable internal, + Supplier<OptionalInt> latestControllerSupplier + ) { + if (internal instanceof ApiException) { + // This exception is a standard API error response from the controller, which can pass + // through without modification. + return new EventHandlerExceptionInfo(false, false, internal); + } else if (internal instanceof NotLeaderException) { + // The controller has lost leadership. + return new EventHandlerExceptionInfo(false, true, internal, + ControllerExceptions.newWrongControllerException(latestControllerSupplier.get())); + } else if (internal instanceof RejectedExecutionException) { + // The controller event queue is shutting down. + return new EventHandlerExceptionInfo(false, false, internal, + new TimeoutException("The controller is shutting down.", internal)); + } else if (internal instanceof BoundedListTooLongException) { + // The operation could not be performed because it would have created an overly large + // batch. + return new EventHandlerExceptionInfo(false, false, internal, + new PolicyViolationException("Unable to perform excessively large batch " + + "operation.")); + } else if (internal instanceof UnexpectedEndOffsetException) { + // The active controller picked the wrong end offset for its next batch. It must now + // fail over. This should be pretty rare. + return new EventHandlerExceptionInfo(false, true, internal, + new NotControllerException("Unexpected end offset. Controller not known.")); Review Comment: nit: "Controller not known"? maybe "Controller will resign" or something? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -740,22 +735,24 @@ public Long apply(List<ApiMessageAndVersion> records) { // Start by trying to apply the record to our in-memory state. This should always // succeed; if it does not, that's a fatal error. It is important to do this before // scheduling the record for Raft replication. - int i = 1; + int i = 0; Review Comment: I realize it already existed, but can we rename `i` to something more descriptive ########## raft/src/main/java/org/apache/kafka/raft/RaftClient.java: ########## @@ -172,15 +176,17 @@ default void beginShutdown() {} * uncommitted entries after observing an epoch change. * * @param epoch the current leader epoch + * @param requiredEndOffset if this is set, it is the offset we must use. Review Comment: nit "offset which must be exactly the end offset of the atomic append" or something more precise than "use" ########## metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.errors; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.errors.UnexpectedEndOffsetException; +import org.apache.kafka.server.mutable.BoundedListTooLongException; + +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.concurrent.RejectedExecutionException; +import java.util.function.Supplier; + + +public final class EventHandlerExceptionInfo { + /** + * True if this exception should be treated as a fault. + */ + private final boolean isFault; + + /** + * True if this exception should cause a controller failover. + * All faults cause failover + */ + private final boolean causesFailover; + + /** + * The internal exception. + */ + private final Throwable internalException; + + /** + * The exception to present to RPC callers, or Optional.empty if the internal exception should + * be presented directly. + */ + private final Optional<Throwable> externalException; + + /** + * Create an EventHandlerExceptionInfo object from an internal exception. + * + * @param internal The internal exception. + * @param latestControllerSupplier A function we can call to obtain the latest leader id. + * + * @return The new immutable info object. + */ + public static EventHandlerExceptionInfo fromInternal( + Throwable internal, + Supplier<OptionalInt> latestControllerSupplier + ) { + if (internal instanceof ApiException) { + // This exception is a standard API error response from the controller, which can pass + // through without modification. + return new EventHandlerExceptionInfo(false, false, internal); + } else if (internal instanceof NotLeaderException) { + // The controller has lost leadership. + return new EventHandlerExceptionInfo(false, true, internal, + ControllerExceptions.newWrongControllerException(latestControllerSupplier.get())); + } else if (internal instanceof RejectedExecutionException) { + // The controller event queue is shutting down. + return new EventHandlerExceptionInfo(false, false, internal, + new TimeoutException("The controller is shutting down.", internal)); + } else if (internal instanceof BoundedListTooLongException) { + // The operation could not be performed because it would have created an overly large + // batch. + return new EventHandlerExceptionInfo(false, false, internal, + new PolicyViolationException("Unable to perform excessively large batch " + + "operation.")); + } else if (internal instanceof UnexpectedEndOffsetException) { + // The active controller picked the wrong end offset for its next batch. It must now + // fail over. This should be pretty rare. + return new EventHandlerExceptionInfo(false, true, internal, + new NotControllerException("Unexpected end offset. Controller not known.")); + } else if (internal instanceof InterruptedException) { + // The controller event queue has been interrupted. This normally only happens during + // a JUnit test that has hung. The test framework sometimes sends an InterruptException + // to all threads to try to get them to shut down. This isn't the correct way to shut + // the test, but it may happen if something hung. + return new EventHandlerExceptionInfo(true, true, internal, + new UnknownServerException("The controller was interrupted.")); + } else { + // This is the catch-all case for things that aren't supposed to happen. Null pointer + // exceptions, illegal argument exceptions, etc. They get translated into an + // UnknownServerException and a controller failover. + return new EventHandlerExceptionInfo(true, true, internal, + new UnknownServerException(internal)); + } + } + + /** + * Returns true if the class and message fields match for two exceptions. Handles nulls. + */ + static boolean exceptionClassesAndMessagesMatch(Throwable a, Throwable b) { + if (a == null) return b == null; + if (b == null) return false; + if (!a.getClass().equals(b.getClass())) return false; + if (!Objects.equals(a.getMessage(), b.getMessage())) return false; + return true; + } + + EventHandlerExceptionInfo( + boolean isFault, + boolean causesFailover, + Throwable internalException + ) { + this.isFault = isFault; + this.causesFailover = causesFailover; + this.internalException = internalException; + this.externalException = Optional.empty(); + } + + EventHandlerExceptionInfo( + boolean isFault, + boolean causesFailover, + Throwable internalException, + Throwable externalException + ) { + this.isFault = isFault; + this.causesFailover = causesFailover; + this.internalException = internalException; + this.externalException = Optional.of(externalException); + } + + public boolean isFault() { + return isFault; + } + + public boolean causesFailover() { + return causesFailover; + } + + public Throwable effectiveExternalException() { + return externalException.orElse(internalException); + } + + public String failureMessage( + int epoch, + OptionalLong deltaUs, + boolean isActiveController, + long lastCommittedOffset + ) { + StringBuilder bld = new StringBuilder(); + if (deltaUs.isPresent()) { + bld.append("failed with "); + } else { + bld.append("unable to start processing because of "); + } + bld.append(internalException.getClass().getSimpleName()); + if (externalException.isPresent()) { + bld.append(" (treated as "). + append(externalException.get().getClass().getSimpleName()).append(")"); + } + if (causesFailover()) { + bld.append(" at epoch ").append(epoch); Review Comment: Can we just always log the epoch? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -973,14 +970,14 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) { log.debug("Replaying commits from the active node up to " + "offset {} and epoch {}.", offset, epoch); } - int i = 1; + int i = 0; Review Comment: Same `i` comment as above ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -458,38 +459,32 @@ private void handleEventEnd(String name, long startProcessingTimeNs) { controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs)); } - private Throwable handleEventException(String name, - OptionalLong startProcessingTimeNs, - Throwable exception) { - Throwable externalException = - ControllerExceptions.toExternalException(exception, () -> latestController()); - if (!startProcessingTimeNs.isPresent()) { - log.error("{}: unable to start processing because of {}. Reason: {}", name, - exception.getClass().getSimpleName(), exception.getMessage()); - return externalException; - } - long endProcessingTime = time.nanoseconds(); - long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); - long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS); - if (ControllerExceptions.isExpected(exception)) { - log.info("{}: failed with {} in {} us. Reason: {}", name, - exception.getClass().getSimpleName(), deltaUs, exception.getMessage()); - return externalException; + private Throwable handleEventException( + String name, + OptionalLong startProcessingTimeNs, + Throwable exception + ) { + OptionalLong deltaUs; + if (startProcessingTimeNs.isPresent()) { + long endProcessingTime = time.nanoseconds(); + long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong(); + deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, NANOSECONDS)); + } else { + deltaUs = OptionalLong.empty(); + } + EventHandlerExceptionInfo info = EventHandlerExceptionInfo. + fromInternal(exception, () -> latestController()); + String failureMessage = info.failureMessage(lastCommittedEpoch, deltaUs, + isActiveController(), lastCommittedOffset); + if (info.isFault()) { + nonFatalFaultHandler.handleFault(name + ": " + failureMessage, exception); + } else { + log.info("{}: {}", name, failureMessage); Review Comment: why not ERROR here? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ########## @@ -1076,10 +1073,10 @@ public void handleLeaderChange(LeaderAndEpoch newLeader) { renounce(); } } else if (newLeader.isLeader(nodeId)) { - log.info("Becoming the active controller at epoch {}, committed offset {}, " + - "committed epoch {}", newLeader.epoch(), lastCommittedOffset, - lastCommittedEpoch); - claim(newLeader.epoch()); + long newLastWriteOffset = endOffset - 1; Review Comment: After an election, is the endOffset report by raft both the last written and last committed offset? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
