GitHub user guozhangwang opened a pull request:
https://github.com/apache/kafka/pull/130
KIP-28: First patch
Some open questions collected so far on the first patch. Thanks @gwenshap
@jkreps.
1. Can we hide the Chooser interface from users? In other words, if users
can specify the "time" on each fetched messages from Kafka, would a hard-coded
MinTimestampMessageChooser be sufficient so that we can move TimestampTracker /
RecordQueue / Chooser / RecordCollector / etc all to the internal folders?
2. Shall we split the o.a.k.clients into two folders, with
o.a.k.clients.processor in stream? Or should we just remove
o.a.k.clients.processor and make everything under o.a.k.stream? In addition,
currently there is a cyclic dependency between that two, would better to break
it in the end state.
3. Topology API: requiring users to instantiate their own Topology class
with the overridden build() function is a little awkward. Instead it would be
great to let users explicitly build the topology in Main and pass it in as a
class:
```
Topology myTopology = new TopologyBuilder(defaultDeser)
.addProcessor("my-processor", MyProcessor.class, new Source("my-source"))
.addProcessor("my-other-processor", MyOtherProcessor.class, "my-processor");
KafkaStreaming streaming = new KafkaStreaming(config, myTopology);
streaming.run();
```
So the implementation of KStream.filter look instead like this:
```
public KStream<K, V> filter(Predicate<K, V> predicate) {
KStreamFilter<K, V> filter = new KStreamFilter<>();
topology.addProcessor(KStreamFilter.class, new Configs("predicate",
predicate));
return this;
}
```
The advantage is that the user code can now get rid of the whole Topology
class with the builder. I think the order of execution for that API is quite
unintuitive.
4. We can probably move the forward() function from Processor to
ProcessorContext, and split ProcessorContext into two classes, one with all the
function calls as commit / send / schedule / forward, and another with the
metadata function calls as topic / partition / offset / timestamp.
5. Merge ProcessorConfigs with ProcessorProperties.
6. Consider moving the external dependencies such as RocksDB into a
separate jar? For example we can just include a kafka-stream-rocksdb.jar which
includes the RocksDBKeyValueStore only, and later on when we deprecate / remove
such implementations we can simply remove the jar itself.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/confluentinc/kafka streaming
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/130.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #130
----
commit 1527f277dc33a18ce348a357d7883349af72fc49
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-24T16:40:46Z
First commit
commit abc220a0f2217a69103466fc3e9bdcf92502a15a
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-25T17:56:58Z
stream synchronization
commit dc200a460f31331bbb5b2bcc3f0567e5fb80904e
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-29T16:27:06Z
ported many stuff from Jay's streming prototype
commit 34d02b21e46902df77e844b1b8b30043fa98cff3
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-29T20:52:49Z
removed punctuate method, added punctuationqueue per streamsynchronizer
commit d7c068990513c4ded6f0efbd04d9995c1a69db85
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-29T21:39:13Z
fixed compile warnings
commit f81d463df655349af25889e2a3319403fa017d6d
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-29T23:19:49Z
added sync to PunctuationSchedulerImpl
commit 480ee6d41e26499a3e035f8471a82431b3733014
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-30T00:09:23Z
pass stream time to puctuate()
commit b20bbd2d596f6e5e53a0eaab0f8f88275c5e8e8b
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-30T16:46:16Z
removed flush method from KStream and Processor
commit 583348bad6e7dd2ea62078231d757de72a7ea0ec
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-30T17:09:35Z
simplified recordqueue interface
commit 8cce08db01c0d3df62ba52316fade3dfd58cba24
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-30T18:03:40Z
separated timestamp stacking from queue impl
commit 31576183c1011e68fe54774d4160c81ab17c59ab
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-30T19:03:18Z
comments
commit 20b52d40514313616f49980f737597ff9a4c961c
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-30T19:33:59Z
comments
commit 2c4f3335c88a397557dede3d5745a7b69ee068ea
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-30T21:24:40Z
added Ingestor interface, renamed RegulatedConsumer to IngestorImpl
commit 9f561b18268dc7bc6d90099cdcee2e602d027373
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-30T21:54:18Z
added KStreamBranchTest
commit ee3e923bd2fbc92d3b24449a13bd64ead5bc1f20
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-30T23:10:06Z
more tests
commit 0c790442a6d82796140d63939952806f88426913
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-06-30T23:29:01Z
more tests
commit 566d45778e00ccd2bc027811675c92a86f143025
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T16:04:03Z
use Ingestor interface in StreamSycnhronizer and KStreamContextImpl
commit b388a3ede7784263c9236b3e12ffd0de3f6ded06
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T16:51:42Z
more join tests
commit 02af7188154bcb64ba3aa60afc3f1fa6c1c61241
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T17:16:28Z
test FilteredIterator
commit 50dbcb8998e825e510495458f0daa32064ae1be3
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T17:48:34Z
test MinTimestampTracker
commit 1d9d63d5bc880c2f8e69b2b5044ca5dcfec4f67e
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T20:47:38Z
more test, and removed StreamSynchronizerFactory
commit 56ce42495dba119e908fb8720bfe31e5e5b87da7
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T21:07:49Z
removed KStreamContext.steamingConfig() method
commit 3ab1730edfc358f8a4a4fc54fb53da6bc994d6ea
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T21:17:21Z
parameter name
commit 36bcdd6f74bb336d92a619666ee9a649dc1d7b9a
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T21:33:38Z
added Ingestor.numPartition(topic) method
commit e3d4c598f69e7d15b3ac21f3246173fc55fa207f
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T22:09:08Z
removed unused member variables
commit ff6df57dbee920637b3566db5073d9b4312b2c95
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T23:02:48Z
more test
commit b021f43f45c9e228829707d31b4e8458b183d702
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T23:15:50Z
moved StreamSynchronizer to internal
commit c27397fb8d111a9f70e248b9c1e741701324490b
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-01T23:32:31Z
moved Chooser and RecordQueue to internal
commit 72fd3abf2ac8568e296ed8a5a79213a04dbe2419
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-02T18:05:12Z
removed nestedLoop, added joinPrior
commit a32fa7315d8736ec7ce7be00841121929fb3ab44
Author: Yasuhiro Matsuda <[email protected]>
Date: 2015-07-02T18:29:44Z
test joinPior
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---