Hey Tommy, Can you include the config for your job? It's true that the RocksDB flush call doesn't do anything, but Samza composes a bunch of wrapper stores on top of the raw store. You can see this composition in BaseKeyValueStorageEngineFactory.getStorageEngine(). Composition usually looks something like this:
NullSafeStore -> CachedStore -> SerailizedStore -> LoggedStore -> RocksDB The most important one here is CachedStore. In the past, we've had a few bugs similar to what you're describing, but I believe that we've hammered them all out. The TestKeyValueStores class does some fairly rigorous checks of the type you're describing. Three questions: 1. Can you include your job's config? 2. Can you write a unit test that reproduces this? Even if it has to loop 1000s of times before it occurs, it'd be useful. 3. Have you tried using LevelDB instead of RocksDB for comparison? It will help determine whether it's an issue with RocksDB itself, or with our caching class. Cheers, Chris On 1/8/15 9:51 PM, "Yan Fang" <[email protected]> wrote: >Hi Tommy, > >This seems weird. flush() does not do anything. Also, I tried the same >thing you were showing, I can see the value from *store.get("foo")*... >Simply do > > private KeyValueStore<String, String> store; > > public void init(Config config, TaskContext context) { > > this.store = (KeyValueStore<String, String>) >context.getStore("printout-stats"); > > } > > public void process(IncomingMessageEnvelope envelope, MessageCollector >collector, TaskCoordinator coordinator) { > > store.put("foo", "bar"); > > System.out.println(" foo result " + fooResult); > > } > >Maybe others in the community have better ideas why it happens. > >Thanks, >Fang, Yan >[email protected] >+1 (206) 849-4108 > >On Thu, Jan 8, 2015 at 6:12 PM, Tommy Becker <[email protected]> wrote: > >> We are writing some tasks that make use of the RocksDb kv store, and a >> couple of us here have seen the same strange behavior. Essentially, >>get() >> seems to not return values that were previously put(), until flush() is >> called. But even if flush isn't called, the put values will be >>returned by >> store.all(). A colleague took a look and saw that >> RocksDbKeyValueStore.flush() doesn't do anything of consequence, but we >> haven't yet looked deeper. To summarize, the behavior seems to be: >> >> store.put("foo", "bar); >> store.get("foo") returns null ?? >> store.all() returns an iterator containing ("foo", "bar") >> store.flush(); >> store.get("foo") returns "bar" as expected >> >> Is this behavior intended? >> >> ________________________________ >> >> This email and any attachments may contain confidential and privileged >> material for the sole use of the intended recipient. Any review, >>copying, >> or distribution of this email (or any attachments) by others is >>prohibited. >> If you are not the intended recipient, please contact the sender >> immediately and permanently delete this email and any attachments. No >> employee or agent of TiVo Inc. is authorized to conclude any binding >> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo >> Inc. may only be made by a signed written agreement. >>
