[
https://issues.apache.org/jira/browse/NIFI-3248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211048#comment-16211048
]
ASF GitHub Bot commented on NIFI-3248:
--------------------------------------
Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2199#discussion_r145697961
--- Diff:
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java
---
@@ -170,159 +210,213 @@ protected void init(final
ProcessorInitializationContext context) {
return this.descriptors;
}
+ final static Set<String> propertyNamesForActivatingClearState = new
HashSet<String>();
+ static {
+ propertyNamesForActivatingClearState.add(SOLR_TYPE.getName());
+ propertyNamesForActivatingClearState.add(SOLR_LOCATION.getName());
+ propertyNamesForActivatingClearState.add(COLLECTION.getName());
+ propertyNamesForActivatingClearState.add(SOLR_QUERY.getName());
+ propertyNamesForActivatingClearState.add(DATE_FIELD.getName());
+ propertyNamesForActivatingClearState.add(RETURN_FIELDS.getName());
+ }
+
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
- lastEndDatedRef.set(UNINITIALIZED_LAST_END_DATE_VALUE);
+ if
(propertyNamesForActivatingClearState.contains(descriptor.getName()))
+ clearState.set(true);
}
- @OnStopped
- public void onStopped() {
- writeLastEndDate();
- }
+ @OnScheduled
+ public void clearState(final ProcessContext context) throws
IOException {
+ if (clearState.getAndSet(false)) {
+ context.getStateManager().clear(Scope.CLUSTER);
+ final Map<String,String> newStateMap = new
HashMap<String,String>();
- @OnRemoved
- public void onRemoved() {
- final File lastEndDateCache = new File(FILE_PREFIX +
getIdentifier());
- if (lastEndDateCache.exists()) {
- lastEndDateCache.delete();
- }
- }
+ newStateMap.put(STATE_MANAGER_CURSOR_MARK, "*");
- @Override
- public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- final ComponentLog logger = getLogger();
- readLastEndDate();
-
- final SimpleDateFormat sdf = new
SimpleDateFormat(LAST_END_DATE_PATTERN, Locale.US);
- sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
- final String currDate = sdf.format(new Date());
-
- final boolean initialized =
!UNINITIALIZED_LAST_END_DATE_VALUE.equals(lastEndDatedRef.get());
-
- final String query = context.getProperty(SOLR_QUERY).getValue();
- final SolrQuery solrQuery = new SolrQuery(query);
- solrQuery.setRows(context.getProperty(BATCH_SIZE).asInteger());
-
- // if initialized then apply a filter to restrict results from the
last end time til now
- if (initialized) {
- StringBuilder filterQuery = new StringBuilder();
- filterQuery.append(context.getProperty(DATE_FIELD).getValue())
- .append(":{").append(lastEndDatedRef.get()).append("
TO ")
- .append(currDate).append("]");
- solrQuery.addFilterQuery(filterQuery.toString());
- logger.info("Applying filter query {}", new
Object[]{filterQuery.toString()});
- }
+ final String initialDate =
context.getProperty(DATE_FILTER).getValue();
+ if (StringUtils.isBlank(initialDate))
+ newStateMap.put(STATE_MANAGER_FILTER, "*");
+ else
+ newStateMap.put(STATE_MANAGER_FILTER, initialDate);
- final String returnFields =
context.getProperty(RETURN_FIELDS).getValue();
- if (returnFields != null && !returnFields.trim().isEmpty()) {
- for (String returnField : returnFields.trim().split("[,]")) {
- solrQuery.addField(returnField.trim());
- }
+ context.getStateManager().setState(newStateMap, Scope.CLUSTER);
+
+ id_field = null;
}
+ }
- final String fullSortClause =
context.getProperty(SORT_CLAUSE).getValue();
- if (fullSortClause != null && !fullSortClause.trim().isEmpty()) {
- for (String sortClause : fullSortClause.split("[,]")) {
- String[] sortParts = sortClause.trim().split("[ ]");
- solrQuery.addSort(sortParts[0],
SolrQuery.ORDER.valueOf(sortParts[1]));
- }
+ @Override
+ protected final Collection<ValidationResult>
additionalCustomValidation(ValidationContext context) {
+ final Collection<ValidationResult> problems = new ArrayList<>();
+
+ if
(context.getProperty(RETURN_TYPE).evaluateAttributeExpressions().getValue().equals(MODE_REC.getValue())
+ && !context.getProperty(RECORD_WRITER).isSet()) {
+ problems.add(new ValidationResult.Builder()
+ .explanation("for parsing records a record writer has
to be configured")
+ .valid(false)
+ .subject("Record writer check")
+ .build());
}
+ return problems;
+ }
+ private String getFieldNameOfUniqueKey() {
+ final SolrQuery solrQuery = new SolrQuery();
try {
+ solrQuery.setRequestHandler("/schema/uniquekey");
final QueryRequest req = new QueryRequest(solrQuery);
if (isBasicAuthEnabled()) {
req.setBasicAuthCredentials(getUsername(), getPassword());
}
- // run the initial query and send out the first page of results
- final StopWatch stopWatch = new StopWatch(true);
- QueryResponse response = req.process(getSolrClient());
- stopWatch.stop();
-
- long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+
return(req.process(getSolrClient()).getResponse().get("uniqueKey").toString());
+ } catch (SolrServerException | IOException e) {
+ getLogger().error("Solr query to retrieve uniqueKey-field
failed due to {}", new Object[]{solrQuery.toString(), e}, e);
+ throw new ProcessException(e);
+ }
+ }
- final SolrDocumentList documentList = response.getResults();
- logger.info("Retrieved {} results from Solr for {} in {} ms",
- new Object[] {documentList.getNumFound(), query,
duration});
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
- if (documentList != null && documentList.getNumFound() > 0) {
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, new
QueryResponseOutputStreamCallback(response));
- flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), "application/xml");
- session.transfer(flowFile, REL_SUCCESS);
+ final ComponentLog logger = getLogger();
+ final AtomicBoolean continuePaging = new AtomicBoolean(true);
+ final SolrQuery solrQuery = new SolrQuery();
- StringBuilder transitUri = new StringBuilder("solr://");
- transitUri.append(getSolrLocation());
- if
(SOLR_TYPE_CLOUD.equals(context.getProperty(SOLR_TYPE).getValue())) {
-
transitUri.append("/").append(context.getProperty(COLLECTION).getValue());
- }
+ try {
+ if (id_field == null) {
+ id_field = getFieldNameOfUniqueKey();
+ }
- session.getProvenanceReporter().receive(flowFile,
transitUri.toString(), duration);
+ final String dateField =
context.getProperty(DATE_FIELD).getValue();
- // if initialized then page through the results and send
out each page
- if (initialized) {
- int endRow = response.getResults().size();
- long totalResults =
response.getResults().getNumFound();
+ solrQuery.setQuery("*:*");
+ final String query =
context.getProperty(SOLR_QUERY).getValue();
+ if (!StringUtils.isBlank(query) && !query.equals("*:*")) {
+ solrQuery.addFilterQuery(query);
+ }
+ final StringBuilder automatedFilterQuery = (new
StringBuilder())
+ .append(dateField)
+ .append(":[")
+
.append(context.getStateManager().getState(Scope.CLUSTER).get(STATE_MANAGER_FILTER))
+ .append(" TO *]");
--- End diff --
When I rerun GetSolr processor after clearing its state, I've got following
error, and probably we should take a situation when state doesn't contain
STATE_MANAGER_FILTER here:
```
2017-10-19 22:22:54,827 ERROR [Timer-Driven Process Thread-4]
org.apache.nifi.processors.solr.GetSolr GetSolr[id=34a73c1d-015f-10
00-6121-dfeaf41fe595] GetSolr[id=34a73c1d-015f-1000-6121-dfeaf41fe595]
failed to process due to org.apache.solr.client.solrj.impl
.HttpSolrClient$RemoteSolrException: Error from server at
http://192.168.99.1:8983/solr/techproducts: Invalid Date String:'null';
rolling back session: {}
org.apache.solr.client.solrj.impl.HttpSolrClient$RemoteSolrException: Error
from server at http://192.168.99.1:8983/solr/techprod
ucts: Invalid Date String:'null'
at
org.apache.solr.client.solrj.impl.HttpSolrClient.executeMethod(HttpSolrClient.java:592)
at
org.apache.solr.client.solrj.impl.HttpSolrClient.request(HttpSolrClient.java:261)
at
org.apache.solr.client.solrj.impl.HttpSolrClient.request(HttpSolrClient.java:250)
at
org.apache.solr.client.solrj.impl.LBHttpSolrClient.doRequest(LBHttpSolrClient.java:403)
at
org.apache.solr.client.solrj.impl.LBHttpSolrClient.request(LBHttpSolrClient.java:355)
at
org.apache.solr.client.solrj.impl.CloudSolrClient.sendRequest(CloudSolrClient.java:1291)
at
org.apache.solr.client.solrj.impl.CloudSolrClient.requestWithRetryOnStaleState(CloudSolrClient.java:1061)
at
org.apache.solr.client.solrj.impl.CloudSolrClient.request(CloudSolrClient.java:997)
at
org.apache.solr.client.solrj.SolrRequest.process(SolrRequest.java:149)
at
org.apache.solr.client.solrj.SolrRequest.process(SolrRequest.java:166)
at
org.apache.nifi.processors.solr.GetSolr.onTrigger(GetSolr.java:336)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
> GetSolr can miss recently updated documents
> -------------------------------------------
>
> Key: NIFI-3248
> URL: https://issues.apache.org/jira/browse/NIFI-3248
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 1.0.0, 0.5.0, 0.6.0, 0.5.1, 0.7.0, 0.6.1, 1.1.0, 0.7.1,
> 1.0.1
> Reporter: Koji Kawamura
> Assignee: Johannes Peter
> Attachments: nifi-flow.png, query-result-with-curly-bracket.png,
> query-result-with-square-bracket.png
>
>
> GetSolr holds the last query timestamp so that it only fetches documents
> those have been added or updated since the last query.
> However, GetSolr misses some of those updated documents, and once the
> documents date field value becomes older than last query timestamp, the
> document won't be able to be queried by GetSolr any more.
> This JIRA is for tracking the process of investigating this behavior, and
> discussion on them.
> Here are things that can be a cause of this behavior:
> |#|Short description|Should we address it?|
> |1|Timestamp range filter, curly or square bracket?|No|
> |2|Timezone difference between update and query|Additional docs might be
> helpful|
> |3|Lag comes from NearRealTIme nature of Solr|Should be documented at least,
> add 'commit lag-time'?|
> h2. 1. Timestamp range filter, curly or square bracket?
> At the first glance, using curly and square bracket in mix looked strange
> ([source
> code|https://github.com/apache/nifi/blob/support/nifi-0.5.x/nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/GetSolr.java#L202]).
> But these difference has a meaning.
> The square bracket on the range query is inclusive and the curly bracket is
> exclusive. If we use inclusive on both sides and a document has a time stamp
> exactly on the boundary then it could be returned in two consecutive
> executions, and we only want it in one.
> This is intentional, and it should be as it is.
> h2. 2. Timezone difference between update and query
> Solr treats date fields as [UTC
> representation|https://cwiki.apache.org/confluence/display/solr/Working+with+Dates|].
> If date field String value of an updated document represents time without
> timezone, and NiFi is running on an environment using timezone other than
> UTC, GetSolr can't perform date range query as users expect.
> Let's say NiFi is running with JST(UTC+9). A process added a document to Solr
> at 15:00 JST. But the date field doesn't have timezone. So, Solr indexed it
> as 15:00 UTC. Then GetSolr performs range query at 15:10 JST, targeting any
> documents updated from 15:00 to 15:10 JST. GetSolr formatted dates using UTC,
> i.e. 6:00 to 6:10 UTC. The updated document won't be matched with the date
> range filter.
> To avoid this, updated documents must have proper timezone in date field
> string representation.
> If one uses NiFi expression language to set current timestamp to that date
> field, following NiFi expression can be used:
> {code}
> ${now():format("yyyy-MM-dd'T'HH:mm:ss.SSSZ")}
> {code}
> It will produce a result like:
> {code}
> 2016-12-27T15:30:04.895+0900
> {code}
> Then it will be indexed in Solr with UTC and will be queried by GetSolr as
> expected.
> h2. 3. Lag comes from NearRealTIme nature of Solr
> Solr provides Near Real Time search capability, that means, the recently
> updated documents can be queried in Near Real Time, but it's not real time.
> This latency can be controlled by either on client side which requests the
> update operation by specifying "commitWithin" parameter, or on the Solr
> server side, "autoCommit" and "autoSoftCommit" in
> [solrconfig.xml|https://cwiki.apache.org/confluence/display/solr/UpdateHandlers+in+SolrConfig#UpdateHandlersinSolrConfig-Commits].
> Since commit and updating index can be costly, it's recommended to set this
> interval long enough up to the maximum tolerable latency.
> However, this can be problematic with GetSolr. For instance, as shown in the
> simple NiFi flow below, GetSolr can miss updated documents:
> {code}
> t1: GetSolr queried
> t2: GenerateFlowFile set date = t2
> t3: PutSolrContentStream stored new doc
> t4: GetSolr queried again, from t1 to t4, but the new doc hasn't been indexed
> t5: Solr completed index
> t6: GetSolr queried again, from t4 to t6, the doc didn't match query
> {code}
> This behavior should be at least documented.
> Plus, it would be helpful to add a new configuration property to GetSolr, to
> specify commit lag-time so that GetSolr aims older timestamp range to query
> documents.
> {code}
> // with commit lag-time
> t1: GetSolr queried
> t2: GenerateFlowFile set date = t2
> t3: PutSolrContentStream stored new doc
> t4: GetSolr queried again, from (t1 - lag) to (t4 - lag), but the new doc
> hasn't been indexed
> t5: Solr completed index
> t6: GetSolr queried again, from (t4 - lag) to (t6 - lag), the doc can match
> query
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)