[ 
https://issues.apache.org/jira/browse/NIFI-4105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069479#comment-16069479
 ] 

ASF GitHub Bot commented on NIFI-4105:
--------------------------------------

Github user ggthename commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1937#discussion_r124964237
  
    --- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
    @@ -75,19 +81,64 @@
     @Tags({"cassandra", "cql", "select"})
     @EventDriven
     @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    -@CapabilityDescription("Execute provided Cassandra Query Language (CQL) 
select query on a Cassandra 1.x, 2.x, or 3.0.x cluster. Query result "
    -        + "may be converted to Avro or JSON format. Streaming is used so 
arbitrarily large result sets are supported. This processor can be "
    +@CapabilityDescription("Executes provided Cassandra Query Language (CQL) 
select query on a Cassandra to fetch all rows whose values"
    +        + "in the specified Maximum Value column(s) are larger than the 
previously-seen maxima.Query result"
    +        + "may be converted to Avro, JSON or CSV format. Streaming is used 
so arbitrarily large result sets are supported. This processor can be "
             + "scheduled to run on a timer, or cron expression, using the 
standard scheduling methods, or it can be triggered by an incoming FlowFile. "
             + "If it is triggered by an incoming FlowFile, then attributes of 
that FlowFile will be available when evaluating the "
             + "select query. FlowFile attribute 'executecql.row.count' 
indicates how many rows were selected.")
    +@Stateful(scopes = Scope.CLUSTER, description = "After performing query, 
the maximum value of the specified column is stored, "
    +        + "fetch all rows whose values in the specified Maximum Value 
column(s) are larger than the previously-seen maximum"
    +        + "State is stored across the cluster so that the next time this 
Processor can be run with min and max values")
     @WritesAttributes({@WritesAttribute(attribute = "executecql.row.count", 
description = "The number of rows returned by the CQL query")})
     public class QueryCassandra extends AbstractCassandraProcessor {
     
    +    public static final String CSV_FORMAT = "CSV";
         public static final String AVRO_FORMAT = "Avro";
         public static final String JSON_FORMAT = "JSON";
     
    +    public static final String CASSANDRA_WATERMARK_MIN_VALUE_ID = 
"CASSANDRA_WATERMARK_MIN_VALUE_ID";
    +    public static final String CASSANDRA_WATERMARK_MAX_VALUE_ID = 
"CASSANDRA_WATERMARK_MAX_VALUE_ID";
    +
         public static final String RESULT_ROW_COUNT = "executecql.row.count";
     
    +    public static final PropertyDescriptor INIT_WATERMARK = new 
PropertyDescriptor.Builder().name("Initial Watermark Value")
    +            .description("Use it only once.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor BACKOFF_PERIOD = new 
PropertyDescriptor.Builder()
    +            .name("Backoff Period")
    +            .description("Only records older than the backoff period will 
be eligible for pickup. This can be used in the ILM use case to define a 
retention period.")
    +            .defaultValue("10 seconds")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .sensitive(false)
    +            .build();
    +
    +    public static final PropertyDescriptor OVERLAP_TIME = new 
PropertyDescriptor.Builder()
    +            .name("Overlap Period")
    +            .description("Amount of time to overlap into the last load 
date to ensure long running transactions missed by previous load weren't 
missed. Recommended: >0s")
    +            .required(true)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("0 seconds")
    +            .build();
    +
    +    public static final PropertyDescriptor DATE_FIELD = new 
PropertyDescriptor.Builder()
    --- End diff --
    
    I think it's a good idea, but a little bit complex. 
    because we usually want to concentrate the time of record when we use the 
incremental fetch feature. ( To avoid omissions )
    
    So there are the concepts of BACKOFF_PERIOD and OVERLAP_TIME.
    It is difficult to apply the other type column.
    
    In my opinion, a date column is most appropriate for  the incremental fetch 
feature.


> support the specified Maximum value column and CSV Stream for Cassandra
> -----------------------------------------------------------------------
>
>                 Key: NIFI-4105
>                 URL: https://issues.apache.org/jira/browse/NIFI-4105
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 1.3.0
>            Reporter: Yoonwon Ko
>
> I'm trying to find a CassandraProcessor to fetch rows whose values in the 
> specified Maximum Value columns are larger than the previously-seen maximum 
> like QueryDatabaseTable.
> But I found only QueryCassandra. It just executes same CQL everytime without 
> keeping maximum value.
> and I think we also need convertToCsvStream option.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to