Hi, Jack,

Unfortunately, this would happen for all stores that has the changelog
configured, even you would try to iterate and remove the large records
*before* it is flushed. The reason that you saw this in CachedStore.all()
is that we call flush() in CachedStore when creating the iterator, which
would capture the large record exception in flush. If you disable the
CachedStore, then LoggedStore.put() would have already sent the large
record to Kafka and you will get the exception in commit() and the whole
container will fail anyways, even you try to create an iterator and remove
it from the KV-store. Since the flush of the dirty records in CachedStore
to LoggedStore is not controlled by user code, there is no guaranteed way
to iterate the CachedStore and delete the large record before it is sent to
Kafka.

The only safe way to avoid the large record as for now is to check the size
before writing to the store.

Thanks!

-Yi

On Wed, May 4, 2016 at 11:43 AM, Jack Huang <jackhu...@mz.com> wrote:

> The following code
>
> for(KeyValueIterator<String, MyObject> itor = myStore.all();
> itor.hasNext(); ) {     ...
> }
>
> ​
>
> Throws the exception
>
> *org.apache.samza.SamzaException: Unable to send message from
> TaskName-Partition 8 to system kafka.*
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply$mcV$sp(KafkaSystemProducer.scala:152)
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136)
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$flush$1.apply(KafkaSystemProducer.scala:136)
>         at
> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer.updateTimer(KafkaSystemProducer.scala:39)
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer.flush(KafkaSystemProducer.scala:136)
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$stop$1.apply(KafkaSystemProducer.scala:56)
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$stop$1.apply(KafkaSystemProducer.scala:56)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer.stop(KafkaSystemProducer.scala:56)
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:120)
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:116)
>         at
> org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81)
>         at
> org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:91)
>         at
> org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87)
>         at
> org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
>         at
> org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:77)
>         at
> org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:73)
>         at
> org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:193)
>         at
> org.apache.samza.storage.kv.CachedStore.all(CachedStore.scala:134)
>         at
> org.apache.samza.storage.kv.NullSafeKeyValueStore.all(NullSafeKeyValueStore.scala:78)*
>      at
>
> org.apache.samza.storage.kv.KeyValueStorageEngine.all(KeyValueStorageEngine.scala:79)
>         at MyTask.window(SessionizeTask.java:192)*
>         at
> org.apache.samza.container.TaskInstance$$anonfun$window$1.apply$mcV$sp(TaskInstance.scala:166)
>         at
> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
>         at
> org.apache.samza.container.TaskInstance.window(TaskInstance.scala:165)
>         at
> org.apache.samza.container.RunLoop$$anonfun$window$1$$anonfun$apply$mcVJ$sp$5.apply(RunLoop.scala:146)
>         at
> org.apache.samza.container.RunLoop$$anonfun$window$1$$anonfun$apply$mcVJ$sp$5.apply(RunLoop.scala:143)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>         at
> org.apache.samza.container.RunLoop$$anonfun$window$1.apply$mcVJ$sp(RunLoop.scala:143)
>         at
> org.apache.samza.util.TimerUtils$class.updateTimerAndGetDuration(TimerUtils.scala:51)
>         at
> org.apache.samza.container.RunLoop.updateTimerAndGetDuration(RunLoop.scala:35)
>         at org.apache.samza.container.RunLoop.window(RunLoop.scala:137)
>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:75)
>         at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:553)
>         at
> org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:92)
>         at
> org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:66)
>         at
> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)*Caused
> by: org.apache.kafka.common.errors.RecordTooLargeException: The
> request included a message larger than the max message size the server
> will accept.*
>
> Here is the config for myStore:
>
>
> serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
>
> serializers.registry.serializable.class=org.apache.samza.serializers.SerializableSerdeFactory
>
> stores.myStore.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
> stores.myStore.changelog=kafka.__samza_kv_MyTask_MyStore
> stores.myStore.key.serde=string
> stores.myStore.msg.serde=serializable
>
> ​
>
> The size of MyObject is unbounded, so I am not surprise to get
> RecordTooLargeException. However, I get the exception when I am attempting
> to create the KeyValueIterator with KeyValueStore.all(), which means I
> cannot even recover from this error by deleting the too-large record. Can
> anyone explain how I can recover from this exception?
>
>
> Regards,
>
>
> Jack
>

Reply via email to