Yi/Milinda, I am trying to initialize a kv store. I have the following properties defined:
stores.store-name.key.serde=json stores.store-name.msg.serde=json stores.store-name.changelog=argos.windowchangelog How do I define a key serde as I am getting this exception: Exception in thread "main" org.apache.samza.SamzaException: Must define a key serde when using key value storage. at org.apache.samza.storage.kv.BaseKeyValueStorageEngineFactory$class.getStorageEngine(BaseKeyValueStorageEngineFactory.scala:86) at org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory.getStorageEngine(RocksDbKeyValueStorageEngineFactory.scala:28) at org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:455) at org.apache.samza.container.SamzaContainer$$anonfun$35$$anonfun$38.apply(SamzaContainer.scala:439) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:439) at org.apache.samza.container.SamzaContainer$$anonfun$35.apply(SamzaContainer.scala:416) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.AbstractSet.map(Set.scala:47) at org.apache.samza.container.SamzaContainer$.apply(SamzaContainer.scala:416) at org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:63) at org.apache.samza.job.JobRunner.run(JobRunner.scala:62) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:37) at org.apache.samza.job.JobRunner.main(JobRunner.scala) On Mon, Jun 29, 2015 at 12:41 PM, Shekar Tippur <ctip...@gmail.com> wrote: > Yi, > > My use case is more of the latter. Your explanation makes sense now. I was > also looking into Milinda's wiki. She has a section for Kafka > partition SimplePartitioner, which is simple enough as well. > > Thanks for all the inputs. Let me see what I come up with while > implementing it. > > - Shekar > > On Mon, Jun 29, 2015 at 10:42 AM, Yi Pan <nickpa...@gmail.com> wrote: > >> Hi, Shekar, >> >> First, I would like to clarify what you meant by sliding window: is it >> defined as windows with size N and advance step size of 1 (which means >> that >> windows overlap and each input message would contribute to multiple counts >> in different windows)? Or windows with size N and advance step size of N >> (i.e. each incoming message only contribute to one counter in a single >> window)? >> >> If your use case falls into the first category, you will need something >> more sophisticated as discussed in SAMZA-552. If your use case is the >> second one, there could be a simpler version of SAMZA-552 that you can go >> with: >> >> 1) Initiate a KV-store that uses the application name as the key >> 2) For each incoming message, look for the windows that the message by the >> application name >> 3) Update the counter and update the value in the KV-store based on the >> application name >> 4) Every 5 min when window() method is triggered, set all counters to zero >> (this can be done in a lazy way as well, by keeping the last reset >> timestamp in the record in the KV-store, keyed by application name. Then, >> resetting counter to zero can be done when next time the application >> counter is updated again) >> >> Hope that makes sense. >> >> -Yi >> >> On Mon, Jun 29, 2015 at 10:06 AM, Shekar Tippur <ctip...@gmail.com> >> wrote: >> >> > Benjamin, >> > >> > Thanks for the explanation. We dont have any specific partition scheme >> as >> > yet. We just have 2 topics - raw and processed and we use default >> > partitioning scheme. >> > Can you share any code snippet so I can understand it better? >> > >> > - Shekar >> > >> > >