[ 
https://issues.apache.org/jira/browse/SAMZA-608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated SAMZA-608:
----------------------------------
    Attachment: SAMZA-608-hello-samza.diff

I was able to replicate this using hello-samza as a starting point. I'm 
attaching a diff that sets hello-samza up to replicate.

Before running the job, create the input and output topics:

{noformat}
deploy/kafka/bin/kafka-topics.sh --create --topic sunky-raw --partitions 1 
--zookeeper localhost:2181 --replication-factor 1
deploy/kafka/bin/kafka-topics.sh --create --topic sunky-edit --partitions 1 
--zookeeper localhost:2181 --replication-factor 1
{noformat}

Then start the job:

{noformat}
deploy/samza/bin/run-job.sh 
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory 
--config-path=file://$PWD/deploy/samza/config/wikipedia-parser.properties
{noformat}

Then produce three messages:

{noformat}
0b838454-fe38-4043-8643-0cecd77c9ae7###{ "qwevent": "ACCOUNT_CREATE", 
"localtime": "2014-07-14T01:22:37+02:00", "event_id": 
"c7027d65-ff89-493f-bdf7-a93db8179001", "client_info": { "distributor": 
"amazon", "device": "pc", "os": "windows", "platform": "portal", "type": 
"browser" }, "client_id": "db78acd2-dfe7-453c-9922-7b21bb770afe", "partner_id": 
"a1f9ec7a-db95-4154-b755-85916f2c7927", "game_id": 
"a39b4ddb-b00c-4f4a-9761-9e1a7dc76aaa", "user_id": 
"0b838454-fe38-4043-8643-0cecd77c9ae7", "account_id": 
"7010f724-5ab1-42d7-beb4-b1cc4c532aaa", "guls": { "game": "GAME1", "user": 111, 
"lang": "de", "server": "dev-1" }, "display_name": "NICKNAME1", "usernames": 
["NICKNAME1","NICKNAME2"], "migration": { "signup_time": 
"2009-07-14T15:22:37+02:00", "validation_time": "2009-07-14T15:24:45+02:00", 
"last_login": "2014-07-14T15:22:37+02:00", "first_login": 
"2009-07-14T15:25:33+02:00" }}
0b838454-fe38-4043-8643-0cecd77c9ae7###{ "qwevent": "ACCOUNT_BALANCE", 
"localtime": "2014-07-14T15:22:37+02:00", "event_id": 
"c7027d65-ff89-493f-bdf7-a93db8179680", "client_info": { "distributor": 
"amazon", "device": "pc", "os": "windows", "platform": "portal", "type": 
"browser" }, "client_id": "db78acd2-dfe7-453c-9922-7b21bb770afe", "partner_id": 
"a1f9ec7a-db95-4154-b755-85916f2c7927", "game_id": 
"a39b4ddb-b00c-4f4a-9761-9e1a7dc76d8f", "user_id": 
"0b838454-fe38-4043-8643-0cecd77c9ae7", "account_id": 
"7010f724-5ab1-42d7-beb4-b1cc4c5325e0", "guls": { "game": "GAME1", "user": 
12345, "lang": "de", "server": "dev-50" }, "balance": { "type": "PURCHASE", 
"amount": 1500, "currency": "GAME1_website#bonus", "remaining": 1500, "reason": 
"veteran bonus" }, "item": { "quantity": 3, "short_desc": 
"GAME1_item#7001570_ItemName", "long_desc": "GAME1_item#7001570_Description", 
"id": "7001570", "category": [ "GAME1_item#Cat1","GAME1_item#Cat2","regular" ] 
}, "transaction": { "id": 1d563cb5-80fb-43e0-9d4f-ef3dc8d0a742, "s":0}}
0b838454-fe38-4043-8643-0cecd77c9ae7###{ "qwevent": "USER_VERIFY", "localtime": 
"2014-07-14T15:22:37+02:00", "event_id": 
"c7027d65-ff89-493f-bdf7-a93db8179680", "client_info": { "distributor": 
"amazon", "device": "pc", "os": "windows", "platform": "portal", "type": 
"browser" }, "user_id": "0b838454-fe38-4043-8643-0cecd77c9ae7", "email": 
"[email protected]"}
{noformat}

In STDOUT for the container, you'll see:

{noformat}
IncomingMessageEnvelope [systemStreamPartition=SystemStreamPartition [kafka, 
sunky-raw, 0], offset=0, key=[B@47f9738, message={qwevent=ACCOUNT_CREATE, 
localtime=2014-07-14T01:22:37+02:00, 
event_id=c7027d65-ff89-493f-bdf7-a93db8179001, client_info={distributor=amazon, 
device=pc, os=windows, platform=portal, type=browser}, 
client_id=db78acd2-dfe7-453c-9922-7b21bb770afe, 
partner_id=a1f9ec7a-db95-4154-b755-85916f2c7927, 
game_id=a39b4ddb-b00c-4f4a-9761-9e1a7dc76aaa, 
user_id=0b838454-fe38-4043-8643-0cecd77c9ae7, 
account_id=7010f724-5ab1-42d7-beb4-b1cc4c532aaa, guls={game=GAME1, user=111, 
lang=de, server=dev-1}, display_name=NICKNAME1, usernames=[NICKNAME1, 
NICKNAME2], migration={signup_time=2009-07-14T15:22:37+02:00, 
validation_time=2009-07-14T15:24:45+02:00, 
last_login=2014-07-14T15:22:37+02:00, first_login=2009-07-14T15:25:33+02:00}}]
{noformat}

If you check the container logs, you'll see that the message is dropped for 
message 2:

{noformat}
2015-03-19 14:26:53 SystemConsumers [DEBUG] Cannot deserialize an incoming 
message. Dropping the error message.
org.codehaus.jackson.JsonParseException: Unexpected character ('d' (code 100)): 
was expecting comma to separate OBJECT entries
 at [Source: [B@16fdec90; line: 1, column: 942]
        at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
        at 
org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385)
        at 
org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:306)
        at 
org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:312)
        at 
org.codehaus.jackson.map.deser.UntypedObjectDeserializer.mapObject(UntypedObjectDeserializer.java:183)
        at 
org.codehaus.jackson.map.deser.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:76)
        at 
org.codehaus.jackson.map.deser.UntypedObjectDeserializer.mapObject(UntypedObjectDeserializer.java:204)
        at 
org.codehaus.jackson.map.deser.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:76)
        at 
org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2395)
        at 
org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
        at org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
        at 
org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:115)
        at 
org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:290)
        at 
org.apache.samza.system.SystemConsumers.org$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
        at 
org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276)
        at 
org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at 
scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
        at scala.collection.SetLike$class.map(SetLike.scala:93)
        at scala.collection.AbstractSet.map(Set.scala:47)
        at 
org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:276)
        at 
org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:213)
        at 
org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(RunLoop.scala:81)
        at 
org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(RunLoop.scala:81)
        at 
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
        at 
org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:80)
        at 
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
        at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
        at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
        at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
        at 
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
        at 
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
        at 
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
        at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
{noformat}

But you'll never see message 3 until the job is restarted. Checking the 
buffered message MBeans, I can see that the 3rd message is sitting in 
KafkaSystemConsumer's buffer, but SystemConsumers is not pulling it out.

> Deserialization error causes SystemConsumers to hang
> ----------------------------------------------------
>
>                 Key: SAMZA-608
>                 URL: https://issues.apache.org/jira/browse/SAMZA-608
>             Project: Samza
>          Issue Type: Task
>          Components: container
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>             Fix For: 0.10.0
>
>         Attachments: SAMZA-608-hello-samza.diff
>
>
> SamzaContainers seem to wedge if malformed messages are sent to it, even if 
> {{task.drop.deserialization.errors=true}}. This was initially raised on the 
> mailing list 
> [here|http://mail-archives.apache.org/mod_mbox/samza-dev/201503.mbox/%3C94CF0C2E22F96641A963FB3B6D2911213A2715B2%40GF-KA-EX01.gf.local%3E].



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to