| ... I am leaning towards Approach C as its simpler, not tied to BlockingEnvelopeMap and has separation of concerns. ... Example Usages High level stateless application
Code Block |
| language |
java |
| theme |
Emacs |
| collapse |
true |
|
/**
* Sample test case w/ multiple input partition using collection based system.
*/
...
...
ImmutableList<IV> inputA = ...
ImmutableList<IV> inputB = ...
List<OV> outputData = ... // mutable
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream")
.from(ImmutableList.of(inputA, inputB));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream")
.from(outputData);
// application logic
StreamApplication app = StreamApplication.create(...);
app.from(input)
.map(...)
.sendTo(output);
app.run();
app.waitForFinish();
// assertions on outputData |
High level application with durable state
Code Block |
| language |
java |
| theme |
Emacs |
| collapse |
true |
|
/**
* Sample test case using collection based system for high level application with durable state.
* Note: We don't provide the ability to recover data/bootstrap data for store using in-memory system. It should work seamlessly once we have StoreDecriptor.
*/
...
...
ImmutableList<IV> inputA = ...
List<OV> outputData = ... // mutable
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("test-stream")
.from(ImmutableList.of(inputA));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-test-stream")
.from(outputData);
// application logic
StreamApplication app = StreamApplication.create(...);
app.from(input)
.map(...)
.sendTo(output);
app.run();
app.waitForFinish();
// assertions on outputData
|
Low level stateless application
Code Block |
| language |
java |
| theme |
Emacs |
| collapse |
true |
|
/**
* Sample test case using collection based system for low level application.
*/
...
...
ImmutableList<IV> inputA = ...
List<OV> outputData = ... // mutable
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app")
.from(ImmutableList.of(inputA));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
.from(outputData);
// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
.addInputs(Collections.singletonList(input))
.addOutputs(Collections.singletonList(output));
app.run();
app.waitForFinish();
// assertions on outputData |
Low level application with durable state
Code Block |
| language |
java |
| theme |
Emacs |
| collapse |
true |
|
/**
* Sample test case using collection based system for low level application with durable state.
* Note: We don't provide the ability to recover data/bootstrap data for store using in-memory system. It should work seamlessly once we have StoreDecriptor.
*/
...
...
ImmutableList<IV> inputA = ...
List<OV> outputData = ... // mutable
StreamDescriptor.Input<IK,IV> input = StreamDescriptor.<IK,IV>input("input-stream-low-level-app")
.from(ImmutableList.of(inputA));
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
.from(outputData);
// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
.addInputs(Collections.singletonList(input, changelog))
.addOutputs(Collections.singletonList(output));
app.run();
app.waitForFinish();
// assertions on outputData
|
Low level application with custom IME
Code Block |
| language |
java |
| theme |
Emacs |
| collapse |
true |
|
/**
* Sample test case using collection based system for low level application using custom IME.
* It demonstrates the use of IME as a data source as opposed to raw message. The users are responsible for creating a
* complete IME object with partition information.
*/
...
...
ImmutableList<MyIME> inputData = Utils.createMyIME(...);
List<OV> outputData = ... // mutable
StreamDescriptor.Input<Object, Object> input = StreamDescriptor.<>input("input-stream-low-level-app")
.from(inputData);
StreamDescriptor.Output<OK,OV> output = StreamDescriptor.<OK,OV>output("output-stream-low-level-app")
.from(outputData);
// application logic
StreamTaskApplication app = StreamTaskApplication.create(config, new MyTaskFactory())
.addInputs(Collections.singletonList(input, changelog))
.addOutputs(Collections.singletonList(output));
app.run();
app.waitForFinish();
// assertions on outputData |
Low level application with manual checkpoint ... Unsupported Use cases For V1, we don't support checkpointing ...
| language |
java |
| theme |
Emacs |
| collapse |
true |
... will not support the following use cases since it has a depdencies.
- High level application with durable state
- Low level application with durable state
- Application with manual checkpoint. Note. Manual checkpointing will result in a no-op and might not result in desired behaviour.
Samza SQL application Users should be able to leverage in-memory collection based system to test Samza SQL application provided Samza SQL integrates with SEP-2. ... |