dajac commented on code in PR #13511: URL: https://github.com/apache/kafka/pull/13511#discussion_r1166515699
########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -576,7 +576,10 @@ class GroupMetadataManager(brokerId: Int, } } - private def doLoadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit): Unit = { + // Visible for testing + private[group] def doLoadGroupsAndOffsets(topicPartition: TopicPartition, Review Comment: nit: It seems that we could revert this change now. ########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -1273,9 +1283,10 @@ object GroupMetadataManager { throw new KafkaException("Failed to decode message using offset topic decoder (message had a missing key)") } else { GroupMetadataManager.readMessageKey(record.key) match { - case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value) - case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value) - case _ => throw new KafkaException("Failed to decode message using offset topic decoder (message had an invalid key)") + case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value) + case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value) + case unknownKey: UnknownKey => (Some(s"UNKNOWN(version=${unknownKey.version})"), None) Review Comment: nit: In order to follow the format of the offset and group keys, I wonder if we should use `unknown::version=$version`. What do you think? ########## core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala: ########## @@ -148,17 +149,23 @@ object TransactionLog { // Formatter for use with tools to read transaction log messages class TransactionLogMessageFormatter extends MessageFormatter { def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { - Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey => - val transactionalId = txnKey.transactionalId - val value = consumerRecord.value - val producerIdMetadata = if (value == null) - None - else - readTxnRecordValue(transactionalId, ByteBuffer.wrap(value)) - output.write(transactionalId.getBytes(StandardCharsets.UTF_8)) - output.write("::".getBytes(StandardCharsets.UTF_8)) - output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8)) - output.write("\n".getBytes(StandardCharsets.UTF_8)) + Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { + case txnKey: TxnKey => + val transactionalId = txnKey.transactionalId + val value = consumerRecord.value + val producerIdMetadata = if (value == null) + None + else + readTxnRecordValue(transactionalId, ByteBuffer.wrap(value)) + output.write(transactionalId.getBytes(StandardCharsets.UTF_8)) + output.write("::".getBytes(StandardCharsets.UTF_8)) + output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8)) + output.write("\n".getBytes(StandardCharsets.UTF_8)) + + case _: UnknownKey => // Only print if this message is a transaction record + + case unexpectedKey => + throw new IllegalStateException(s"Found unexpected key $unexpectedKey while reading transaction log.") Review Comment: `writeTo` is actually used to write records to the log. It does not make sense to swallow unknown records here. Therefore, we should throw an `IllegalStateException` for unknown records as well. Moreover, the message should be change as `while reading transaction log` is not correct. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala: ########## @@ -167,25 +174,44 @@ object TransactionLog { * Exposed for printing records using [[kafka.tools.DumpLogSegments]] */ def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = { - val txnKey = TransactionLog.readTxnRecordKey(record.key) - val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}" - - val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match { - case None => "<DELETE>" - - case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," + - s"producerEpoch:${txnMetadata.producerEpoch}," + - s"state=${txnMetadata.state}," + - s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," + - s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," + - s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}" - } + TransactionLog.readTxnRecordKey(record.key) match { + case txnKey: TxnKey => + val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}" + + val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match { + case None => "<DELETE>" + + case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," + + s"producerEpoch:${txnMetadata.producerEpoch}," + + s"state=${txnMetadata.state}," + + s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," + + s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," + + s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}" + } + + (Some(keyString), Some(valueString)) + + case _: UnknownKey => + (Some("<UNKNOWN>"), Some("<UNKNOWN>")) - (Some(keyString), Some(valueString)) + case unexpectedKey => + throw new IllegalStateException(s"Found unexpected key $unexpectedKey while formatting transaction log.") Review Comment: I think that we could remove this one if we use a `sealed trait`. ########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -683,8 +685,10 @@ class GroupMetadataManager(brokerId: Int, removedGroups.add(groupId) } - case unknownKey => - throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata") + case _: UnknownKey => // do nothing + + case unexpectedKey => + throw new IllegalStateException(s"Unexpected message key $unexpectedKey while loading offsets and group metadata") Review Comment: I think that we could use a `sealed trait` to fix this. Does it work? ########## core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala: ########## @@ -167,25 +174,44 @@ object TransactionLog { * Exposed for printing records using [[kafka.tools.DumpLogSegments]] */ def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = { - val txnKey = TransactionLog.readTxnRecordKey(record.key) - val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}" - - val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match { - case None => "<DELETE>" - - case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," + - s"producerEpoch:${txnMetadata.producerEpoch}," + - s"state=${txnMetadata.state}," + - s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," + - s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," + - s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}" - } + TransactionLog.readTxnRecordKey(record.key) match { + case txnKey: TxnKey => + val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}" + + val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match { + case None => "<DELETE>" + + case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," + + s"producerEpoch:${txnMetadata.producerEpoch}," + + s"state=${txnMetadata.state}," + + s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," + + s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," + + s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}" + } + + (Some(keyString), Some(valueString)) + + case _: UnknownKey => + (Some("<UNKNOWN>"), Some("<UNKNOWN>")) Review Comment: Let's use the same format than we used for the groups. ########## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala: ########## @@ -1085,4 +1087,41 @@ class TransactionStateManagerTest { assertTrue(partitionLoadTime("partition-load-time-max") >= 0) assertTrue(partitionLoadTime( "partition-load-time-avg") >= 0) } + Review Comment: nit: There is an extra empty line. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ########## @@ -467,16 +467,26 @@ class TransactionStateManager(brokerId: Int, memRecords.batches.forEach { batch => for (record <- batch.asScala) { require(record.hasKey, "Transaction state log's key should not be null") - val txnKey = TransactionLog.readTxnRecordKey(record.key) - // load transaction metadata along with transaction state - val transactionalId = txnKey.transactionalId - TransactionLog.readTxnRecordValue(transactionalId, record.value) match { - case None => - loadedTransactions.remove(transactionalId) - case Some(txnMetadata) => - loadedTransactions.put(transactionalId, txnMetadata) + TransactionLog.readTxnRecordKey(record.key) match { + case txnKey: TxnKey => + // load transaction metadata along with transaction state + val transactionalId = txnKey.transactionalId + TransactionLog.readTxnRecordValue(transactionalId, record.value) match { + case None => + loadedTransactions.remove(transactionalId) + case Some(txnMetadata) => + loadedTransactions.put(transactionalId, txnMetadata) + } + currOffset = batch.nextOffset + + case unknownKey: UnknownKey => + warn(s"Unknown message key with version ${unknownKey.version}" + + s" while loading transaction state. Ignoring it. " + + s"It could be a left over from an aborted upgrade.") + + case unexpectedKey => + throw new IllegalStateException(s"Found unexpected key $unexpectedKey while reading transaction log.") Review Comment: I think that we could remove this one if we use a `sealed trait`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org