[ 
https://issues.apache.org/jira/browse/SAMZA-374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14101633#comment-14101633
 ] 

Jakob Homan commented on SAMZA-374:
-----------------------------------

All of these solutions seem pretty hacky since the main problem is that Kafka 
does not have a reliable, safe way to delete existing topics.  The general 
solution at the moment would be to rename the job.  Alternatively, we could 
allow the users to set a prefix in the config for the checkpoint topic name 
and, when they need to discard an existing checkpoint, they increment or change 
this prefix and then restart the job.

> Need to be able to change SSP Grouper
> -------------------------------------
>
>                 Key: SAMZA-374
>                 URL: https://issues.apache.org/jira/browse/SAMZA-374
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>             Fix For: 0.8.0
>
>
> I recently ran a job with checkpointing enabled. The default grouper was used 
> (group by partition). I then decided that I wanted to increase parallelism, 
> so I set the grouper to group by SSP. This cause the container to get wedged 
> in this loop:
> {noformat}
> 2014-08-08 18:56:06 VerifiableProperties [INFO] Verifying properties
> 2014-08-08 18:56:06 VerifiableProperties [INFO] Property client.id is 
> overridden to 
> samza_checkpoint_manager-repartition_by_treeid-i001-1407524103759-2
> 2014-08-08 18:56:06 VerifiableProperties [INFO] Property metadata.broker.list 
> is overridden to kafka-vip-e:10251
> 2014-08-08 18:56:06 VerifiableProperties [INFO] Property request.timeout.ms 
> is overridden to 60000
> 2014-08-08 18:56:06 ClientUtils$ [INFO] Fetching metadata from broker 
> id:0,host:kafka-vip-e,port:10251 with correlation id 7 for 1 topic(s) 
> Set(__samza_checkpoint_ver_1_for_my-job_i001)
> 2014-08-08 18:56:06 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 
> 60000 (requested 60000), SO_RCVBUF = 43690 (requested -1), SO_SNDBUF = 102400 
> (requested 102400).
> 2014-08-08 18:56:06 SyncProducer [INFO] Connected to kafka-vip-e:10251 for 
> producing
> 2014-08-08 18:56:06 SyncProducer [INFO] Disconnecting from kafka-vip-e:10251
> 2014-08-08 18:56:06 ClientUtils$ [DEBUG] Successfully fetched metadata for 1 
> topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
> 2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Connecting to leader 
> app196.:10251 for topic __samza_checkpoint_ver_1_for_my-job_i001 and to fetch 
> all checkpoint messages.
> 2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Got offset 13 for topic 
> __samza_checkpoint_ver_1_for_my-job_i001 and partition 0. Attempting to fetch 
> messages for changelog partition mapping.
> 2014-08-08 18:56:06 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
> 2014-08-08 18:56:06 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 
> 60000 (requested 60000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 
> 8192 (requested -1).
> 2014-08-08 18:56:06 KafkaCheckpointManager [INFO] Get latest offset 80626 for 
> topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0.
> 2014-08-08 18:56:07 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
> 2014-08-08 18:56:07 KafkaCheckpointManager [WARN] While trying to read last 
> changelog partition mapping entry for topic 
> __samza_checkpoint_ver_1_for_my-job_i001 and partition 0: 
> org.apache.samza.SamzaException: Exception while deserializing checkpoint 
> key. Retrying.
> 2014-08-08 18:56:07 KafkaCheckpointManager [DEBUG] Exception detail:
> org.apache.samza.SamzaException: Exception while deserializing checkpoint key
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:177)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:300)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:292)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at kafka.message.MessageSet.foreach(MessageSet.scala:67)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:292)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:254)
>       at 
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readLog(KafkaCheckpointManager.scala:253)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readChangeLogPartitionMapping(KafkaCheckpointManager.scala:240)
>       at 
> org.apache.samza.util.Util$.getTaskNameToChangeLogPartitionMapping(Util.scala:280)
>       at 
> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:79)
>       at 
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
>       at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> Caused by: 
> org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartitionGrouperFactoryValues:
>  Checkpoint key's SystemStreamPartition Grouper factory 
> (org.apache.samza.container.grouper.stream.GroupByPartitionFactory) does not 
> match value from current configuration 
> (org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory).
>   This likely means the SystemStreamPartitionGrouper was changed between job 
> runs, which is not supported.
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:170)
>       ... 15 more
> 2014-08-08 18:56:17 VerifiableProperties [INFO] Verifying properties
> 2014-08-08 18:56:17 VerifiableProperties [INFO] Property client.id is 
> overridden to 
> samza_checkpoint_manager-repartition_by_treeid-i001-1407524103759-2
> 2014-08-08 18:56:17 VerifiableProperties [INFO] Property metadata.broker.list 
> is overridden to kafka-vip-e:10251
> 2014-08-08 18:56:17 VerifiableProperties [INFO] Property request.timeout.ms 
> is overridden to 60000
> 2014-08-08 18:56:17 ClientUtils$ [INFO] Fetching metadata from broker 
> id:0,host:kafka-vip-e,port:10251 with correlation id 8 for 1 topic(s) 
> Set(__samza_checkpoint_ver_1_for_my-job_i001)
> 2014-08-08 18:56:17 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 
> 60000 (requested 60000), SO_RCVBUF = 43690 (requested -1), SO_SNDBUF = 102400 
> (requested 102400).
> 2014-08-08 18:56:17 SyncProducer [INFO] Connected to kafka-vip-e:10251 for 
> producing
> 2014-08-08 18:56:17 SyncProducer [INFO] Disconnecting from kafka-vip-e:10251
> 2014-08-08 18:56:17 ClientUtils$ [DEBUG] Successfully fetched metadata for 1 
> topic(s) Set(__samza_checkpoint_ver_1_for_my-job_i001)
> 2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Connecting to leader 
> app196.:10251 for topic __samza_checkpoint_ver_1_for_my-job_i001 and to fetch 
> all checkpoint messages.
> 2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Got offset 14 for topic 
> __samza_checkpoint_ver_1_for_my-job_i001 and partition 0. Attempting to fetch 
> messages for changelog partition mapping.
> 2014-08-08 18:56:17 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
> 2014-08-08 18:56:17 BlockingChannel [DEBUG] Created socket with SO_TIMEOUT = 
> 60000 (requested 60000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 
> 8192 (requested -1).
> 2014-08-08 18:56:17 KafkaCheckpointManager [INFO] Get latest offset 80626 for 
> topic __samza_checkpoint_ver_1_for_my-job_i001 and partition 0.
> 2014-08-08 18:56:17 SimpleConsumer [DEBUG] Disconnecting from app196.:10251
> 2014-08-08 18:56:17 KafkaCheckpointManager [WARN] While trying to read last 
> changelog partition mapping entry for topic 
> __samza_checkpoint_ver_1_for_my-job_i001 and partition 0: 
> org.apache.samza.SamzaException: Exception while deserializing checkpoint 
> key. Retrying.
> 2014-08-08 18:56:17 KafkaCheckpointManager [DEBUG] Exception detail:
> org.apache.samza.SamzaException: Exception while deserializing checkpoint key
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:177)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:300)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1$$anonfun$apply$9.apply(KafkaCheckpointManager.scala:292)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at kafka.message.MessageSet.foreach(MessageSet.scala:67)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:292)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:254)
>       at 
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readLog(KafkaCheckpointManager.scala:253)
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readChangeLogPartitionMapping(KafkaCheckpointManager.scala:240)
>       at 
> org.apache.samza.util.Util$.getTaskNameToChangeLogPartitionMapping(Util.scala:280)
>       at 
> org.apache.samza.job.yarn.SamzaAppMasterTaskManager.<init>(SamzaAppMasterTaskManager.scala:79)
>       at 
> org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:90)
>       at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
> Caused by: 
> org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartitionGrouperFactoryValues:
>  Checkpoint key's SystemStreamPartition Grouper factory 
> (org.apache.samza.container.grouper.stream.GroupByPartitionFactory) does not 
> match value from current configuration 
> (org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory).
>   This likely means the SystemStreamPartitionGrouper was changed between job 
> runs, which is not supported.
>       at 
> org.apache.samza.checkpoint.kafka.KafkaCheckpointLogKey$.fromBytes(KafkaCheckpointLogKey.scala:170)
>       ... 15 more
> {noformat}
> I'm knowingly breaking grouping semantics because my job doesn't need it. As 
> I recall, this was discussed in SAMZA-123, and we were all worried about 
> people accidentally breaking their state/grouping, so we hard fail when the 
> grouper is changed. The problem is, I can't change the checkpoint topic name, 
> nor is it easy for me to delete the checkpoint messages in the topic, so I'm 
> kind of stuck.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to