[
https://issues.apache.org/jira/browse/IGNITE-2016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15071825#comment-15071825
]
Denis Magda commented on IGNITE-2016:
-------------------------------------
Roman,
Agree, there is no need to setup auto-flushing on IgniteDataStreamer size since
Kafka will call {{IgniteDataStreamer.flush()}} periodically.
Moreover, IgniteDataStreamer's buffers will be flushed automatically when
{{IgniteDataStreamer.perNodeBufferSize}} is reached meaning that the streamer
doesn't need to wait until the flush is called and can push data to a cache
asynchronously. Default value of {{IgniteDataStreamer.perNodeBufferSize}} is
1024 and I think it's ok to leave it as is not exposing an additional
configuration parameter for the user. It can be added in the future if needed.
So, summarizing I have the following pseudo-code in my head how implementation
of {{IgniteSinkTask}} {{put}} and {{flush}} basing on {{IgniteDataStreamer}}
should look like omitting details.
{noformat}
public class IgniteSinkTask extends SinkTask {
/** Cache data streamer. */
IgniteDataStreamer ds;
/**
* Buffers records.
*
* @param records Records to inject into grid.
*/
@Override public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
if (record.key() != null)
// Data will be flushed asynchronously when
IgniteDataStreamer.perNodeBufferSize is reached
ds.addData(record.key(), record.value());
}
}
/**
* Pushes buffered data to grid. Flush interval is configured by worker
configurations.
*
* @param offsets Offset information.
*/
@Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets)
{
// Flushing any pending data in the streamer.
ds.flush();
}
}
{noformat}
> Update KafkaStreamer to fit new features introduced in Kafka 0.9
> ----------------------------------------------------------------
>
> Key: IGNITE-2016
> URL: https://issues.apache.org/jira/browse/IGNITE-2016
> Project: Ignite
> Issue Type: New Feature
> Components: streaming
> Reporter: Roman Shtykh
> Assignee: Roman Shtykh
>
> Particularly,
> - new consumer
> - Kafka Connect (Copycat)
> http://www.confluent.io/blog/apache-kafka-0.9-is-released
> This can be a a different integration task or a complete re-write of the
> current implementation, considering the fact that Kafka Connect is a new
> standard way for "large-scale, real-time data import and export for Kafka."
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)