[
https://issues.apache.org/jira/browse/SAMZA-608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini updated SAMZA-608:
----------------------------------
Attachment: SAMZA-608-hello-samza.diff
I was able to replicate this using hello-samza as a starting point. I'm
attaching a diff that sets hello-samza up to replicate.
Before running the job, create the input and output topics:
{noformat}
deploy/kafka/bin/kafka-topics.sh --create --topic sunky-raw --partitions 1
--zookeeper localhost:2181 --replication-factor 1
deploy/kafka/bin/kafka-topics.sh --create --topic sunky-edit --partitions 1
--zookeeper localhost:2181 --replication-factor 1
{noformat}
Then start the job:
{noformat}
deploy/samza/bin/run-job.sh
--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory
--config-path=file://$PWD/deploy/samza/config/wikipedia-parser.properties
{noformat}
Then produce three messages:
{noformat}
0b838454-fe38-4043-8643-0cecd77c9ae7###{ "qwevent": "ACCOUNT_CREATE",
"localtime": "2014-07-14T01:22:37+02:00", "event_id":
"c7027d65-ff89-493f-bdf7-a93db8179001", "client_info": { "distributor":
"amazon", "device": "pc", "os": "windows", "platform": "portal", "type":
"browser" }, "client_id": "db78acd2-dfe7-453c-9922-7b21bb770afe", "partner_id":
"a1f9ec7a-db95-4154-b755-85916f2c7927", "game_id":
"a39b4ddb-b00c-4f4a-9761-9e1a7dc76aaa", "user_id":
"0b838454-fe38-4043-8643-0cecd77c9ae7", "account_id":
"7010f724-5ab1-42d7-beb4-b1cc4c532aaa", "guls": { "game": "GAME1", "user": 111,
"lang": "de", "server": "dev-1" }, "display_name": "NICKNAME1", "usernames":
["NICKNAME1","NICKNAME2"], "migration": { "signup_time":
"2009-07-14T15:22:37+02:00", "validation_time": "2009-07-14T15:24:45+02:00",
"last_login": "2014-07-14T15:22:37+02:00", "first_login":
"2009-07-14T15:25:33+02:00" }}
0b838454-fe38-4043-8643-0cecd77c9ae7###{ "qwevent": "ACCOUNT_BALANCE",
"localtime": "2014-07-14T15:22:37+02:00", "event_id":
"c7027d65-ff89-493f-bdf7-a93db8179680", "client_info": { "distributor":
"amazon", "device": "pc", "os": "windows", "platform": "portal", "type":
"browser" }, "client_id": "db78acd2-dfe7-453c-9922-7b21bb770afe", "partner_id":
"a1f9ec7a-db95-4154-b755-85916f2c7927", "game_id":
"a39b4ddb-b00c-4f4a-9761-9e1a7dc76d8f", "user_id":
"0b838454-fe38-4043-8643-0cecd77c9ae7", "account_id":
"7010f724-5ab1-42d7-beb4-b1cc4c5325e0", "guls": { "game": "GAME1", "user":
12345, "lang": "de", "server": "dev-50" }, "balance": { "type": "PURCHASE",
"amount": 1500, "currency": "GAME1_website#bonus", "remaining": 1500, "reason":
"veteran bonus" }, "item": { "quantity": 3, "short_desc":
"GAME1_item#7001570_ItemName", "long_desc": "GAME1_item#7001570_Description",
"id": "7001570", "category": [ "GAME1_item#Cat1","GAME1_item#Cat2","regular" ]
}, "transaction": { "id": 1d563cb5-80fb-43e0-9d4f-ef3dc8d0a742, "s":0}}
0b838454-fe38-4043-8643-0cecd77c9ae7###{ "qwevent": "USER_VERIFY", "localtime":
"2014-07-14T15:22:37+02:00", "event_id":
"c7027d65-ff89-493f-bdf7-a93db8179680", "client_info": { "distributor":
"amazon", "device": "pc", "os": "windows", "platform": "portal", "type":
"browser" }, "user_id": "0b838454-fe38-4043-8643-0cecd77c9ae7", "email":
"[email protected]"}
{noformat}
In STDOUT for the container, you'll see:
{noformat}
IncomingMessageEnvelope [systemStreamPartition=SystemStreamPartition [kafka,
sunky-raw, 0], offset=0, key=[B@47f9738, message={qwevent=ACCOUNT_CREATE,
localtime=2014-07-14T01:22:37+02:00,
event_id=c7027d65-ff89-493f-bdf7-a93db8179001, client_info={distributor=amazon,
device=pc, os=windows, platform=portal, type=browser},
client_id=db78acd2-dfe7-453c-9922-7b21bb770afe,
partner_id=a1f9ec7a-db95-4154-b755-85916f2c7927,
game_id=a39b4ddb-b00c-4f4a-9761-9e1a7dc76aaa,
user_id=0b838454-fe38-4043-8643-0cecd77c9ae7,
account_id=7010f724-5ab1-42d7-beb4-b1cc4c532aaa, guls={game=GAME1, user=111,
lang=de, server=dev-1}, display_name=NICKNAME1, usernames=[NICKNAME1,
NICKNAME2], migration={signup_time=2009-07-14T15:22:37+02:00,
validation_time=2009-07-14T15:24:45+02:00,
last_login=2014-07-14T15:22:37+02:00, first_login=2009-07-14T15:25:33+02:00}}]
{noformat}
If you check the container logs, you'll see that the message is dropped for
message 2:
{noformat}
2015-03-19 14:26:53 SystemConsumers [DEBUG] Cannot deserialize an incoming
message. Dropping the error message.
org.codehaus.jackson.JsonParseException: Unexpected character ('d' (code 100)):
was expecting comma to separate OBJECT entries
at [Source: [B@16fdec90; line: 1, column: 942]
at org.codehaus.jackson.JsonParser._constructError(JsonParser.java:1291)
at
org.codehaus.jackson.impl.JsonParserMinimalBase._reportError(JsonParserMinimalBase.java:385)
at
org.codehaus.jackson.impl.JsonParserMinimalBase._reportUnexpectedChar(JsonParserMinimalBase.java:306)
at
org.codehaus.jackson.impl.Utf8StreamParser.nextToken(Utf8StreamParser.java:312)
at
org.codehaus.jackson.map.deser.UntypedObjectDeserializer.mapObject(UntypedObjectDeserializer.java:183)
at
org.codehaus.jackson.map.deser.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:76)
at
org.codehaus.jackson.map.deser.UntypedObjectDeserializer.mapObject(UntypedObjectDeserializer.java:204)
at
org.codehaus.jackson.map.deser.UntypedObjectDeserializer.deserialize(UntypedObjectDeserializer.java:76)
at
org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2395)
at
org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1667)
at org.apache.samza.serializers.JsonSerde.fromBytes(JsonSerde.scala:33)
at
org.apache.samza.serializers.SerdeManager.fromBytes(SerdeManager.scala:115)
at
org.apache.samza.system.SystemConsumers.update(SystemConsumers.scala:290)
at
org.apache.samza.system.SystemConsumers.org$apache$samza$system$SystemConsumers$$poll(SystemConsumers.scala:260)
at
org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276)
at
org.apache.samza.system.SystemConsumers$$anonfun$refresh$2.apply(SystemConsumers.scala:276)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at
scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at
org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:276)
at
org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:213)
at
org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(RunLoop.scala:81)
at
org.apache.samza.container.RunLoop$$anonfun$process$2$$anonfun$2.apply(RunLoop.scala:81)
at
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
at
org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:80)
at
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
{noformat}
But you'll never see message 3 until the job is restarted. Checking the
buffered message MBeans, I can see that the 3rd message is sitting in
KafkaSystemConsumer's buffer, but SystemConsumers is not pulling it out.
> Deserialization error causes SystemConsumers to hang
> ----------------------------------------------------
>
> Key: SAMZA-608
> URL: https://issues.apache.org/jira/browse/SAMZA-608
> Project: Samza
> Issue Type: Task
> Components: container
> Affects Versions: 0.9.0
> Reporter: Chris Riccomini
> Fix For: 0.10.0
>
> Attachments: SAMZA-608-hello-samza.diff
>
>
> SamzaContainers seem to wedge if malformed messages are sent to it, even if
> {{task.drop.deserialization.errors=true}}. This was initially raised on the
> mailing list
> [here|http://mail-archives.apache.org/mod_mbox/samza-dev/201503.mbox/%3C94CF0C2E22F96641A963FB3B6D2911213A2715B2%40GF-KA-EX01.gf.local%3E].
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)