[
https://issues.apache.org/jira/browse/SAMZA-94?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13837118#comment-13837118
]
Alan Li commented on SAMZA-94:
------------------------------
With Chris' help - the issue will hit any custom serde if it doesn't return
null on toBytes() when passed in a null, see my serde using Java serialization
below. The fix was to add an explicit check for (object == null) and return
null in that case. org.apache.samza.serializers.Serializer.toBytes() can be
documented to reflect this.
{noformat}
public final class AggregatorStateSerdeFactory implements
SerdeFactory<HashMap<String, State>>
{
@Override
public Serde<HashMap<String, State>> getSerde(String name, Config config)
{
return new AggregatorStateSerde();
}
private static final class AggregatorStateSerde implements
Serde<HashMap<String, State>>
{
@Override
public byte[] toBytes(HashMap<String, State> object)
{
try
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream objOut = new ObjectOutputStream(out);
objOut.writeObject(object);
objOut.close();
return out.toByteArray();
}
catch (IOException ex)
{
throw new IllegalStateException(ex);
}
}
@Override
public HashMap<String, State> fromBytes(byte[] bytes)
{
try
{
ObjectInputStream objIn = new ObjectInputStream(new
ByteArrayInputStream(bytes));
@SuppressWarnings("unchecked")
HashMap<String, State> object = (HashMap<String, State>)
objIn.readObject();
return object;
}
catch (IOException ex)
{
throw new IllegalStateException(ex);
}
catch (ClassNotFoundException ex)
{
throw new IllegalStateException(ex);
}
}
}
}
{noformat}
> State management kv-store delete() does not remove key
> ------------------------------------------------------
>
> Key: SAMZA-94
> URL: https://issues.apache.org/jira/browse/SAMZA-94
> Project: Samza
> Issue Type: Bug
> Components: kv
> Affects Versions: 0.6.0
> Reporter: Alan Li
> Assignee: Chris Riccomini
> Fix For: 0.7.0
>
>
> Sample code snippet:
> {noformat}
> _realtimeStateStore.put("foo", new HashMap());
>
> KeyValueIterator<String, Map<String, State>> realtimeIterator =
> _realtimeStateStore.all();
> while (realtimeIterator.hasNext())
> {
> Entry<String, Map<String, State>> entry = realtimeIterator.next();
> System.out.println("read from leveldb: " + entry.getKey() + "=>" +
> entry.getValue());
> }
> realtimeIterator.close();
>
> _realtimeStateStore.delete("foo");
>
> KeyValueIterator<String, Map<String, State>> iter =
> _realtimeStateStore.all();
> while (iter.hasNext())
> {
> Entry<String, Map<String, State>> entry = iter.next();
> System.out.println("read from leveldb: " + entry.getKey() + "=>" +
> entry.getValue());
> }
> iter.close();
> {noformat}
> Output:
> read from leveldb: foo=>{}
> read from leveldb: foo=>null
> I would expect that the kv would remove both the key & value on delete so
> that subsequent iterators will not return keys that have been deleted.
--
This message was sent by Atlassian JIRA
(v6.1#6144)