[
https://issues.apache.org/jira/browse/IGNITE-1191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14698833#comment-14698833
]
Alexey Goncharuk edited comment on IGNITE-1191 at 9/2/15 10:40 PM:
-------------------------------------------------------------------
I'd like to suggest preliminary API for continuous queries that will allow to
implement all changes listed in subtasks of this ticket.
I think we should extend {{CacheEntryListenerConfiguration}} available in
JCache and add a flag that defines whether initial iteration is required
(updates for it will go to the same listener):
{code}
public class ContinuousQueryConfiguration<K, V> extends
MutableCacheEntryListenerConfiguration<K, V> {
/** Is initial iteration required. */
private boolean initIterRequired;
/**
* Creates new continuous query configuration.
*/
public ContinuousQueryConfiguration() {
super(null, null, false, false);
}
/**
* Creates new continuous query configuration.
*
* @param listenerFactory Local listener factory.
* @param filterFactory Remote filter factory.
* @param isOldValueRequired Is old value required.
* @param isSynchronous Is synchronous.
* @param initIterRequired Is initial iteration required.
*/
public ContinuousQueryConfiguration(
Factory<? extends CacheEntryListener<? super K, ? super V>>
listenerFactory,
Factory<? extends CacheEntryEventFilter<? super K, ? super V>>
filterFactory,
boolean isOldValueRequired,
boolean isSynchronous,
boolean initIterRequired) {
super(listenerFactory, filterFactory, isOldValueRequired,
isSynchronous);
this.initIterRequired = initIterRequired;
}
/**
* Sets whether initial iteration is required.
*
* @param initIterRequired {@code True} if initial iteration required.
*/
public void setInitialIterationRequired(boolean initIterRequired) {
this.initIterRequired = initIterRequired;
}
/**
* Tells whether initial iteration is required.
*
* @return {@code True} if initial iteration required.
*/
public boolean isInitialIterationRequired() {
return initIterRequired;
}
}
{code}
To support transformers we can add one more extension. I created separate class
because of more complicated generics:
{code}
public class TransformContinuousQueryConfiguration<K, V, R> extends
ContinuousQueryConfiguration<K, R> {
/** Transformer factory. */
private Factory<IgniteClosure<? super V, ? super R>> transformerFactory;
/**
* Creates new continuous query configuration.
*/
public TransformContinuousQueryConfiguration() {
super();
}
/**
* Creates new continuous query configuration.
*
* @param listenerFactory Local listener factory.
* @param filterFactory Remote filter factory.
* @param isOldValueRequired Is old value required.
* @param isSynchronous Is synchronous.
* @param initIterRequired Is initial iteration required.
* @param transformerFactory Transformer factory.
*/
public TransformContinuousQueryConfiguration(
Factory<? extends CacheEntryListener<? super K, ? super R>>
listenerFactory,
Factory<? extends CacheEntryEventFilter<? super K, ? super V>>
filterFactory,
boolean isOldValueRequired,
boolean isSynchronous,
boolean initIterRequired,
Factory<IgniteClosure<? super V, ? super R>> transformerFactory) {
super(listenerFactory, filterFactory, isOldValueRequired,
isSynchronous, initIterRequired);
this.transformerFactory = transformerFactory;
}
/**
* Sets transformer factory.
*
* @param transformerFactory Transformer factory.
*/
public void setTransformerFactory(Factory<IgniteClosure<? super V, ? super
R>> transformerFactory) {
this.transformerFactory = transformerFactory;
}
/**
* Gets transformer factory.
*
* @return Transformer factory.
*/
public Factory<IgniteClosure<? super V, ? super R>> getTransformerFactory()
{
return transformerFactory;
}
}
{code}
{{IgniteCache}} should have two new methods:
{code}
public AutoCloseable queryContinuous(ContinuousQueryConfiguration<K, V> cfg);
public <R> AutoCloseable
queryContinuous(TransformContinuousQueryConfiguration<K, V, R> cfg);
{code}
Returned {{AutoCloseable}} instance allows to deregister a query.
was (Author: vkulichenko):
I'd like to suggest preliminary API for continuous queries that will allow to
implement all changes listed in subtasks of this ticket.
I think we should extend {{CacheEntryListenerConfiguration}} available in
JCache and add a flag that defines whether initial iteration is required
(updates for it will go to the same listener):
{code}
public class ContinuousQueryConfiguration<K, V> extends
MutableCacheEntryListenerConfiguration<K, V> {
/** Is initial iteration required. */
private boolean initIterRequired;
/**
* Creates new continuous query configuration.
*/
public ContinuousQueryConfiguration() {
super(null, null, false, false);
}
/**
* Creates new continuous query configuration.
*
* @param listenerFactory Local listener factory.
* @param filterFactory Remote filter factory.
* @param isOldValueRequired Is old value required.
* @param isSynchronous Is synchronous.
* @param initIterRequired Is initial iteration required.
*/
public ContinuousQueryConfiguration(
Factory<? extends CacheEntryListener<? super K, ? super V>>
listenerFactory,
Factory<? extends CacheEntryEventFilter<? super K, ? super V>>
filterFactory,
boolean isOldValueRequired,
boolean isSynchronous,
boolean initIterRequired) {
super(listenerFactory, filterFactory, isOldValueRequired,
isSynchronous);
this.initIterRequired = initIterRequired;
}
/**
* Sets whether initial iteration is required.
*
* @param initIterRequired {@code True} if initial iteration required.
*/
public void setInitialIterationRequired(boolean initIterRequired) {
this.initIterRequired = initIterRequired;
}
/**
* Tells whether initial iteration is required.
*
* @return {@code True} if initial iteration required.
*/
public boolean isInitialIterationRequired() {
return initIterRequired;
}
}
{code}
To support transformers we can add one more extension. I created separate class
because of more complicated generics:
{code}
public class TransformContinuousQueryConfiguration<K, V, R> extends
ContinuousQueryConfiguration<K, R> {
/** Transformer factory. */
private Factory<IgniteClosure<? super V, ? super R>> transformerFactory;
/**
* Creates new continuous query configuration.
*/
public TransformContinuousQueryConfiguration() {
super();
}
/**
* Creates new continuous query configuration.
*
* @param listenerFactory Local listener factory.
* @param filterFactory Remote filter factory.
* @param isOldValueRequired Is old value required.
* @param isSynchronous Is synchronous.
* @param initIterRequired Is initial iteration required.
* @param transformerFactory Transformer factory.
*/
public TransformContinuousQueryConfiguration(
Factory<? extends CacheEntryListener<? super K, ? super R>>
listenerFactory,
Factory<? extends CacheEntryEventFilter<? super K, ? super R>>
filterFactory,
boolean isOldValueRequired,
boolean isSynchronous,
boolean initIterRequired,
Factory<IgniteClosure<? super V, ? super R>> transformerFactory) {
super(listenerFactory, filterFactory, isOldValueRequired,
isSynchronous, initIterRequired);
this.transformerFactory = transformerFactory;
}
/**
* Sets transformer factory.
*
* @param transformerFactory Transformer factory.
*/
public void setTransformerFactory(Factory<IgniteClosure<? super V, ? super
R>> transformerFactory) {
this.transformerFactory = transformerFactory;
}
/**
* Gets transformer factory.
*
* @return Transformer factory.
*/
public Factory<IgniteClosure<? super V, ? super R>> getTransformerFactory()
{
return transformerFactory;
}
}
{code}
{{IgniteCache}} should have two new methods:
{code}
public AutoCloseable queryContinuous(ContinuousQueryConfiguration<K, V> cfg);
public <R> AutoCloseable
queryContinuous(TransformContinuousQueryConfiguration<K, V, R> cfg);
{code}
Returned {{AutoCloseable}} instance allows to deregister a query.
> Continuous Query Changes
> ------------------------
>
> Key: IGNITE-1191
> URL: https://issues.apache.org/jira/browse/IGNITE-1191
> Project: Ignite
> Issue Type: Improvement
> Components: cache
> Reporter: Dmitriy Setrakyan
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)