[ 
https://issues.apache.org/jira/browse/SAMZA-1743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16572439#comment-16572439
 ] 

Prateek Maheshwari commented on SAMZA-1743:
-------------------------------------------

Thanks for the feedback,  [~pgeorgantas]!

It's currently possible (although not obvious how) to do a "map with state" 
using table descriptors, without having to write any table/store configuration. 
You can do this by  calling StreamGraph#getTable with your TableDescriptor and 
ignoring the Table reference. Then, in your MapFunction#init, you can get and 
save the Table reference using TaskContext#getTable(tableId) for use in 
MapFunction#apply.

We could make this flow simpler with new `MessageStream#mapWithState(Table, 
BiFunction<Table, M>)` and `MessageStream#mapWithState(List<Table>, 
BiFunction<Map<String, Table>, M>)` variants for map/filter/flatMap, which 
comes at the cost of adding new APIs for each operator. E.g.,
{code:java}
TableDescriptor td = new RocksDBTableDescriptor();
Table table = graph.getTable(td);
...
inputStream
  .mapWithState(table, (table, message) -> table.get(message))
  .sendTo(outputStream);
...{code}
Is this what you have in mind?

> Allow highlevel operators to have access to programmatically created state.
> ---------------------------------------------------------------------------
>
>                 Key: SAMZA-1743
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1743
>             Project: Samza
>          Issue Type: Improvement
>    Affects Versions: 0.14.0
>            Reporter: Peter Georgantas
>            Priority: Minor
>
> The window and join highlevel operators must have state by definition. When 
> using these operators, the api will programmatically create the config for 
> that state. The api should allow more operators (filter, map, etc) access to 
> the same programmatic state definition. Having this option would help close 
> the gap between code and properties files.
> Relevant code:
> [StatefulOperatorSpec.java|https://github.com/apache/samza/blob/161d1c47a2c7322a7d3197d571a227cce0f1cbbf/samza-core/src/main/java/org/apache/samza/operators/spec/StatefulOperatorSpec.java#L28]
> [JobNode.java|https://github.com/apache/samza/blob/161d1c47a2c7322a7d3197d571a227cce0f1cbbf/samza-core/src/main/java/org/apache/samza/execution/JobNode.java#L166]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to