[
https://issues.apache.org/jira/browse/SAMZA-141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini updated SAMZA-141:
----------------------------------
Description:
TaskInstance has a variable called offsets, which manages offsets for each
input stream. The variable is typed to SystemStream -> String. We use
SystemString when intracting with it in all places, EXCEPT in the
TaskInstance.process method, where we use a SystemStreamPartition as the key.
This doesn't work, since a SystemStreamPartition and SystemStream with the same
system and stream name are actually NOT equal, since the equality checks do a
class check, and the classes don't match.
As a result, you end up with something like this:
{code}
var offsets = Map[SystemStream, String]()
offsets += new SystemStream("foo", "bar") -> "123"
offsets += new SystemStreamPartition("foo", "bar", new Partition(0)) ->
"321"
System.err.println(offsets)
{code}
Which prints:
{noformat}
Map(SystemStream [system=foo, stream=bar] -> 123, SystemStreamPartition
[partition=Partition [partition=0], system=foo, stream=bar] -> 321)
{noformat}
was:
TaskInstance has a variable called offsets, which manages offsets for each
input stream. The variable is typed to SystemStream -> String. We use
SystemString when intracting with it in all places, EXCEPT in the
TaskInstance.process method, where we use a SystemStreamPartition as the key.
This doesn't work, since a SystemStreamPartition and SystemStream with the same
system and stream name are actually NOT equal, since the equality checks do a
class check, and the classes don't match.
As a result, you end up with something like this:
{code}
var offsets = Map[SystemStream, String]()
offsets += new SystemStream("foo", "bar") -> "123"
offsets += new SystemStreamPartition("foo", "bar", new Partition(0)) ->
"321"
System.err.println(offsets)
{code}
Which prints:
{noformat}
Map(SystemStream [system=foo, stream=bar] -> 123, SystemStreamPartition
[partition=Partition [partition=0], system=foo, stream=bar] -> 123)
{noformat}
> TaskInstance.offsets fails to update
> ------------------------------------
>
> Key: SAMZA-141
> URL: https://issues.apache.org/jira/browse/SAMZA-141
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
>
> TaskInstance has a variable called offsets, which manages offsets for each
> input stream. The variable is typed to SystemStream -> String. We use
> SystemString when intracting with it in all places, EXCEPT in the
> TaskInstance.process method, where we use a SystemStreamPartition as the key.
> This doesn't work, since a SystemStreamPartition and SystemStream with the
> same system and stream name are actually NOT equal, since the equality checks
> do a class check, and the classes don't match.
> As a result, you end up with something like this:
> {code}
> var offsets = Map[SystemStream, String]()
> offsets += new SystemStream("foo", "bar") -> "123"
> offsets += new SystemStreamPartition("foo", "bar", new Partition(0)) ->
> "321"
> System.err.println(offsets)
> {code}
> Which prints:
> {noformat}
> Map(SystemStream [system=foo, stream=bar] -> 123, SystemStreamPartition
> [partition=Partition [partition=0], system=foo, stream=bar] -> 321)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)