Github user pvillard31 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1937#discussion_r124734304
--- Diff:
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
---
@@ -75,19 +81,64 @@
@Tags({"cassandra", "cql", "select"})
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
-@CapabilityDescription("Execute provided Cassandra Query Language (CQL)
select query on a Cassandra 1.x, 2.x, or 3.0.x cluster. Query result "
- + "may be converted to Avro or JSON format. Streaming is used so
arbitrarily large result sets are supported. This processor can be "
+@CapabilityDescription("Executes provided Cassandra Query Language (CQL)
select query on a Cassandra to fetch all rows whose values"
+ + "in the specified Maximum Value column(s) are larger than the
previously-seen maxima.Query result"
+ + "may be converted to Avro, JSON or CSV format. Streaming is used
so arbitrarily large result sets are supported. This processor can be "
+ "scheduled to run on a timer, or cron expression, using the
standard scheduling methods, or it can be triggered by an incoming FlowFile. "
+ "If it is triggered by an incoming FlowFile, then attributes of
that FlowFile will be available when evaluating the "
+ "select query. FlowFile attribute 'executecql.row.count'
indicates how many rows were selected.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing query,
the maximum value of the specified column is stored, "
+ + "fetch all rows whose values in the specified Maximum Value
column(s) are larger than the previously-seen maximum"
+ + "State is stored across the cluster so that the next time this
Processor can be run with min and max values")
@WritesAttributes({@WritesAttribute(attribute = "executecql.row.count",
description = "The number of rows returned by the CQL query")})
public class QueryCassandra extends AbstractCassandraProcessor {
+ public static final String CSV_FORMAT = "CSV";
public static final String AVRO_FORMAT = "Avro";
public static final String JSON_FORMAT = "JSON";
+ public static final String CASSANDRA_WATERMARK_MIN_VALUE_ID =
"CASSANDRA_WATERMARK_MIN_VALUE_ID";
+ public static final String CASSANDRA_WATERMARK_MAX_VALUE_ID =
"CASSANDRA_WATERMARK_MAX_VALUE_ID";
+
public static final String RESULT_ROW_COUNT = "executecql.row.count";
+ public static final PropertyDescriptor INIT_WATERMARK = new
PropertyDescriptor.Builder().name("Initial Watermark Value")
+ .description("Use it only once.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BACKOFF_PERIOD = new
PropertyDescriptor.Builder()
+ .name("Backoff Period")
+ .description("Only records older than the backoff period will
be eligible for pickup. This can be used in the ILM use case to define a
retention period.")
+ .defaultValue("10 seconds")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .sensitive(false)
+ .build();
+
+ public static final PropertyDescriptor OVERLAP_TIME = new
PropertyDescriptor.Builder()
+ .name("Overlap Period")
+ .description("Amount of time to overlap into the last load
date to ensure long running transactions missed by previous load weren't
missed. Recommended: >0s")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("0 seconds")
+ .build();
+
+ public static final PropertyDescriptor DATE_FIELD = new
PropertyDescriptor.Builder()
--- End diff --
You're only accepting a date column, but that also could be an ID, no?
Also, see my comment about potentially using multiple columns.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---