[ 
https://issues.apache.org/jira/browse/SAMZA-94?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13837118#comment-13837118
 ] 

Alan Li commented on SAMZA-94:
------------------------------

With Chris' help - the issue will hit any custom serde if it doesn't return 
null on toBytes() when passed in a null, see my serde using Java serialization 
below. The fix was to add an explicit check for (object == null) and return 
null in that case. org.apache.samza.serializers.Serializer.toBytes() can be 
documented to reflect this.

{noformat}
public final class AggregatorStateSerdeFactory implements 
SerdeFactory<HashMap<String, State>>
{
  @Override
  public Serde<HashMap<String, State>> getSerde(String name, Config config)
  {
    return new AggregatorStateSerde();
  }

  private static final class AggregatorStateSerde implements 
Serde<HashMap<String, State>>
  {
    @Override
    public byte[] toBytes(HashMap<String, State> object)
    {
      try
      {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        ObjectOutputStream objOut = new ObjectOutputStream(out);
        objOut.writeObject(object);
        objOut.close();
        return out.toByteArray();
      }
      catch (IOException ex)
      {
        throw new IllegalStateException(ex);
      }
    }

    @Override
    public HashMap<String, State> fromBytes(byte[] bytes)
    {
      try
      {
        ObjectInputStream objIn = new ObjectInputStream(new 
ByteArrayInputStream(bytes));
        @SuppressWarnings("unchecked")
        HashMap<String, State> object = (HashMap<String, State>) 
objIn.readObject();
        return object;
      }
      catch (IOException ex)
      {
        throw new IllegalStateException(ex);
      }
      catch (ClassNotFoundException ex)
      {
        throw new IllegalStateException(ex);
      }
    }
  }
}
{noformat}


> State management kv-store delete() does not remove key
> ------------------------------------------------------
>
>                 Key: SAMZA-94
>                 URL: https://issues.apache.org/jira/browse/SAMZA-94
>             Project: Samza
>          Issue Type: Bug
>          Components: kv
>    Affects Versions: 0.6.0
>            Reporter: Alan Li
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0
>
>
> Sample code snippet: 
> {noformat}
>     _realtimeStateStore.put("foo", new HashMap());
>     
>     KeyValueIterator<String, Map<String, State>> realtimeIterator = 
> _realtimeStateStore.all();
>     while (realtimeIterator.hasNext())
>     {
>       Entry<String, Map<String, State>> entry = realtimeIterator.next();
>       System.out.println("read from leveldb: " + entry.getKey() + "=>" + 
> entry.getValue());
>     }
>     realtimeIterator.close();
>     
>     _realtimeStateStore.delete("foo");
>     
>     KeyValueIterator<String, Map<String, State>> iter = 
> _realtimeStateStore.all();
>     while (iter.hasNext())
>     {
>       Entry<String, Map<String, State>> entry = iter.next();
>       System.out.println("read from leveldb: " + entry.getKey() + "=>" + 
> entry.getValue());
>     }
>     iter.close();
> {noformat}
> Output:
>   read from leveldb: foo=>{}
>   read from leveldb: foo=>null
> I would expect that the kv would remove both the key & value on delete so 
> that subsequent iterators will not return keys that have been deleted.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to