-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32052/#review76431
-----------------------------------------------------------



samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
<https://reviews.apache.org/r/32052/#comment123974>

    > 1. What would happen to the cache entry if we don't check 
partitionMetadata.errorCode?
    
    If we don't check partitionMetadata.errorCode *anywhere*, then any 
partition-level errors will get triggered when getOffsets is called. For 
example, if a leader is not available, getOffsets' OffsetFetchRequest will have 
an errorCode set that will say the leader isn't available.
    
    We're basically putting off the error check until after we've made the 
request we care about (e.g. produce request, offset fetch request, etc).
    
    > 2. Assuming that the errored partition metadata is not inserted in the 
cache, getOffsets would raise exception? And how do we capture that case?
    
    Yes, getOffsets calls:
    
          .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo))
    
    And then checks ths results:
    
            KafkaUtil.maybeThrowException(partitionErrorAndOffset.error)
    
    Any bad errorCodes from the metadata will also get surfaced here (e.g. 
unknown topic/partition, leader offline, etc).
    
    If there is a failure, the maybeThrowException method will throw an 
exception, which will be caught in KafkaSystemAdmin.getSystemStreamMetadata. 
The retryBackoff loop will then catch the exception, and restart from scratch.


- Chris Riccomini


On March 13, 2015, 7:56 p.m., Chris Riccomini wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32052/
> -----------------------------------------------------------
> 
> (Updated March 13, 2015, 7:56 p.m.)
> 
> 
> Review request for samza.
> 
> 
> Bugs: SAMZA-592
>     https://issues.apache.org/jira/browse/SAMZA-592
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> refresh topic metadata if partitions have bad error codes. add a test
> 
> 
> add a little test to verify we ignore replica not available exceptions
> 
> 
> remove partition metadata check from KafkaSystemAdmin since it's already done 
> in getOffsets
> 
> 
> switch to KafkaUtil.maybeThrowException
> 
> 
> Diffs
> -----
> 
>   
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
>  4a1b31f025ba7b05a7b46041aa8e12074599ce24 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
> c6e231a2588ce95940aa2da9483a98c6115e38d9 
>   samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala 
> 147aabc947f0cb01c0780edb693e9714f810b5f6 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
>  b790be17cfe08da28220ffb381cbd618ebe25cf0 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala
>  4a49d22a3fc403f624ca17a6414d84eaba1898be 
>   samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala 
> 2482f23cc6b9c072651df9cbfe9714ffeb203687 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
>  3d1e6ecbb3fd95816c722a68c4f5907120eb20d0 
>   
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala
>  e698d2f1f004740a4d74a488c469d8ca8426c6e4 
>   samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala 
> PRE-CREATION 
>   
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
>  a8b724bf781003142e455fdf1fed2f13d6c18353 
> 
> Diff: https://reviews.apache.org/r/32052/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Chris Riccomini
> 
>

Reply via email to