Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2199#discussion_r145696508
--- Diff:
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
---
@@ -170,159 +210,213 @@ protected void init(final
ProcessorInitializationContext context) {
return this.descriptors;
}
+ final static Set<String> propertyNamesForActivatingClearState = new
HashSet<String>();
+ static {
+ propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
+ propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
+ propertyNamesForActivatingClearState.add(COLLECTION.getName());
+ propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
+ propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
+ propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
+ }
+
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
- lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+ if
(propertyNamesForActivatingClearState.contains(descriptor.getName()))
+ clearState.set(true);
}
- @OnStopped
- public void onStopped() {
- writeLastEndDate();
- }
+ @OnScheduled
+ public void clearState(final ProcessContext context) throws
IOException {
+ if (clearState.getAndSet(false)) {
+ context.getStateManager().clear(Scope.CLUSTER);
+ final Map<String,String> newStateMap = new
HashMap<String,String>();
- @OnRemoved
- public void onRemoved() {
- final File lastEndDateCache = new File(FILE_PREFIX +
getIdentifier());
- if (lastEndDateCache.exists()) {
- lastEndDateCache.delete();
- }
- }
+ newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- final ComponentLog logger = getLogger();
- readLastEndDate();
-
- final SimpleDateFormat sdf = new
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
- sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
- final String currDate = sdf.format(new Date());
-
- final boolean initialized =
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
-
- final String query = context.getProperty(SOLR_QUERY).getValue();
- final SolrQuery solrQuery = new SolrQuery(query);
- solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
-
- // if initialized then apply a filter to restrict results from the
last end time til now
- if (initialized) {
- StringBuilder filterQuery = new StringBuilder();
- filterQuery.append(context.getProperty(DATE_FIELD).getValue())
- .append(":{").append(lastEndDatedRef.get()).append("
TO ")
- .append(currDate).append("]");
- solrQuery.addFilterQuery(filterQuery.toString());
- logger.info("Applying filter query {}", new
Object[]{filterQuery.toString()});
- }
+ final String initialDate =
context.getProperty(DATE_FILTER).getValue();
+ if (StringUtils.isBlank(initialDate))
+ newStateMap.put(STATE_MANAGER_FILTER, "*");
+ else
+ newStateMap.put(STATE_MANAGER_FILTER, initialDate);
- final String returnFields =
context.getProperty(RETURN_FIELDS).getValue();
- if (returnFields != null && !returnFields.trim().isEmpty()) {
- for (String returnField : returnFields.trim().split("[,]")) {
- solrQuery.addField(returnField.trim());
- }
+ context.getStateManager().setState(newStateMap, Scope.CLUSTER);
+
+ id_field = null;
}
+ }
- final String fullSortClause =
context.getProperty(SORT_CLAUSE).getValue();
- if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
- for (String sortClause : fullSortClause.split("[,]")) {
- String[] sortParts = sortClause.trim().split("[ ]");
- solrQuery.addSort(sortParts[0],
SolrQuery.ORDER.valueOf(sortParts[1]));
- }
+ @Override
+ protected final Collection<ValidationResult>
additionalCustomValidation(ValidationContext context) {
+ final Collection<ValidationResult> problems = new ArrayList<>();
+
+ if
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
+ && !context.getProperty(RECORD_WRITER).isSet()) {
+ problems.add(new ValidationResult.Builder()
+ .explanation("for parsing records a record writer has
to be configured")
--- End diff --
`for parsing records a ...` might be `for writing records a ...`? Parsing
is done when reading record formatted data.
---