----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32052/#review76431 -----------------------------------------------------------
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala <https://reviews.apache.org/r/32052/#comment123974> > 1. What would happen to the cache entry if we don't check partitionMetadata.errorCode? If we don't check partitionMetadata.errorCode *anywhere*, then any partition-level errors will get triggered when getOffsets is called. For example, if a leader is not available, getOffsets' OffsetFetchRequest will have an errorCode set that will say the leader isn't available. We're basically putting off the error check until after we've made the request we care about (e.g. produce request, offset fetch request, etc). > 2. Assuming that the errored partition metadata is not inserted in the cache, getOffsets would raise exception? And how do we capture that case? Yes, getOffsets calls: .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo)) And then checks ths results: KafkaUtil.maybeThrowException(partitionErrorAndOffset.error) Any bad errorCodes from the metadata will also get surfaced here (e.g. unknown topic/partition, leader offline, etc). If there is a failure, the maybeThrowException method will throw an exception, which will be caught in KafkaSystemAdmin.getSystemStreamMetadata. The retryBackoff loop will then catch the exception, and restart from scratch. - Chris Riccomini On March 13, 2015, 7:56 p.m., Chris Riccomini wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/32052/ > ----------------------------------------------------------- > > (Updated March 13, 2015, 7:56 p.m.) > > > Review request for samza. > > > Bugs: SAMZA-592 > https://issues.apache.org/jira/browse/SAMZA-592 > > > Repository: samza > > > Description > ------- > > refresh topic metadata if partitions have bad error codes. add a test > > > add a little test to verify we ignore replica not available exceptions > > > remove partition metadata check from KafkaSystemAdmin since it's already done > in getOffsets > > > switch to KafkaUtil.maybeThrowException > > > Diffs > ----- > > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > 4a1b31f025ba7b05a7b46041aa8e12074599ce24 > samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala > c6e231a2588ce95940aa2da9483a98c6115e38d9 > samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala > 147aabc947f0cb01c0780edb693e9714f810b5f6 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > b790be17cfe08da28220ffb381cbd618ebe25cf0 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala > 4a49d22a3fc403f624ca17a6414d84eaba1898be > samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala > 2482f23cc6b9c072651df9cbfe9714ffeb203687 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 3d1e6ecbb3fd95816c722a68c4f5907120eb20d0 > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala > e698d2f1f004740a4d74a488c469d8ca8426c6e4 > samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala > PRE-CREATION > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > a8b724bf781003142e455fdf1fed2f13d6c18353 > > Diff: https://reviews.apache.org/r/32052/diff/ > > > Testing > ------- > > > Thanks, > > Chris Riccomini > >