[
https://issues.apache.org/jira/browse/NIFI-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423105#comment-16423105
]
ASF GitHub Bot commented on NIFI-4035:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2561#discussion_r178643673
--- Diff:
nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/main/java/org/apache/nifi/processors/solr/PutSolrRecord.java
---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.solr;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_PASSWORD;
+import static org.apache.nifi.processors.solr.SolrUtils.BASIC_USERNAME;
+import static org.apache.nifi.processors.solr.SolrUtils.COLLECTION;
+import static
org.apache.nifi.processors.solr.SolrUtils.JAAS_CLIENT_APP_NAME;
+import static
org.apache.nifi.processors.solr.SolrUtils.SOLR_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_LOCATION;
+import static
org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS;
+import static
org.apache.nifi.processors.solr.SolrUtils.SOLR_MAX_CONNECTIONS_PER_HOST;
+import static
org.apache.nifi.processors.solr.SolrUtils.SOLR_SOCKET_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE;
+import static org.apache.nifi.processors.solr.SolrUtils.SOLR_TYPE_CLOUD;
+import static
org.apache.nifi.processors.solr.SolrUtils.SSL_CONTEXT_SERVICE;
+import static org.apache.nifi.processors.solr.SolrUtils.ZK_CLIENT_TIMEOUT;
+import static
org.apache.nifi.processors.solr.SolrUtils.ZK_CONNECTION_TIMEOUT;
+import static org.apache.nifi.processors.solr.SolrUtils.writeRecord;
+
+
+@Tags({"Apache", "Solr", "Put", "Send","Record"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Indexes the Records from a FlowFile into Solr")
+@DynamicProperty(name="A Solr request parameter name", value="A Solr
request parameter value",
+ description="These parameters will be passed to Solr on the
request")
+public class PutSolrRecord extends SolrProcessor {
+
+ public static final PropertyDescriptor UPDATE_PATH = new
PropertyDescriptor
+ .Builder().name("Solr Update Path").displayName("Solr Update
Path")
+ .description("The path in Solr to post the Flowfile Records")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("/update")
+ .build();
+
+ public static final PropertyDescriptor FIELDS_TO_INDEX = new
PropertyDescriptor
+ .Builder().name("Fields To Index").displayName("Fields To
Index")
+ .displayName("Fields To Index")
+ .description("Comma-separated list of field names to write")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor COMMIT_WITHIN = new
PropertyDescriptor
+ .Builder().name("Commit Within").displayName("Commit Within")
+ .description("The number of milliseconds before the given
update is committed")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("5000")
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor
+ .Builder().name("Batch Size").displayName("Batch Size")
+ .description("The number of solr documents to index per batch")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("500")
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("The original FlowFile")
+ .build();
+
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles that failed for any reason other than
Solr being unreachable")
+ .build();
+
+ public static final Relationship REL_CONNECTION_FAILURE = new
Relationship.Builder()
+ .name("connection_failure")
+ .description("FlowFiles that failed because Solr is
unreachable")
+ .build();
+
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+
.name("put-solr-record-record-reader").displayName("put-solr-record-record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for
parsing incoming data and determining the data's schema.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ public static final String COLLECTION_PARAM_NAME = "collection";
+ public static final String COMMIT_WITHIN_PARAM_NAME = "commitWithin";
+ public static final String REPEATING_PARAM_PATTERN = "\\w+\\.\\d+";
+ public final ComponentLog logger = getLogger();
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> descriptors;
+ private static final String EMPTY_STRING = "";
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ super.init(context);
+
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(SOLR_TYPE);
+ descriptors.add(SOLR_LOCATION);
+ descriptors.add(COLLECTION);
+ descriptors.add(UPDATE_PATH);
+ descriptors.add(RECORD_READER);
+ descriptors.add(FIELDS_TO_INDEX);
+ descriptors.add(COMMIT_WITHIN);
+ descriptors.add(JAAS_CLIENT_APP_NAME);
+ descriptors.add(BASIC_USERNAME);
+ descriptors.add(BASIC_PASSWORD);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(SOLR_SOCKET_TIMEOUT);
+ descriptors.add(SOLR_CONNECTION_TIMEOUT);
+ descriptors.add(SOLR_MAX_CONNECTIONS);
+ descriptors.add(SOLR_MAX_CONNECTIONS_PER_HOST);
+ descriptors.add(ZK_CLIENT_TIMEOUT);
+ descriptors.add(ZK_CONNECTION_TIMEOUT);
+ descriptors.add(BATCH_SIZE);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_CONNECTION_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return this.descriptors;
+ }
+
+ @Override
+ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .description("Specifies the value to send for the '" +
propertyDescriptorName + "' request parameter")
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dynamic(true)
+ .expressionLanguageSupported(true)
+ .build();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ final AtomicReference<Exception> error = new
AtomicReference<>(null);
+ final AtomicReference<Exception> connectionError = new
AtomicReference<>(null);
+
+ final boolean isSolrCloud =
SOLR_TYPE_CLOUD.getValue().equals(context.getProperty(SOLR_TYPE).getValue());
+ final String collection =
context.getProperty(COLLECTION).evaluateAttributeExpressions(flowFile).getValue();
+ final Long commitWithin =
context.getProperty(COMMIT_WITHIN).evaluateAttributeExpressions(flowFile).asLong();
+ final String contentStreamPath =
context.getProperty(UPDATE_PATH).evaluateAttributeExpressions(flowFile).getValue();
+ final MultiMapSolrParams requestParams = new
MultiMapSolrParams(getRequestParams(context, flowFile));
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final String fieldsToIndex =
context.getProperty(FIELDS_TO_INDEX).evaluateAttributeExpressions(flowFile).getValue();
+ final Long batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asLong();
+
+ final List<String> fieldList = new ArrayList<>();
+ if (!StringUtils.isBlank(fieldsToIndex)) {
+
fieldList.addAll(Arrays.asList(fieldsToIndex.trim().split("[,]")));
+ }
+ StopWatch timer = new StopWatch(true);
+ try (final InputStream in = session.read(flowFile);
+ final RecordReader reader =
readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+ Record record;
+ List<SolrInputDocument> inputDocumentList = new LinkedList<>();
+ try {
+ while ((record = reader.nextRecord()) != null) {
+ SolrInputDocument inputDoc = new SolrInputDocument();
+ writeRecord(record, inputDoc,fieldList,EMPTY_STRING);
+ inputDocumentList.add(inputDoc);
+ if(inputDocumentList.size()==batchSize) {
+ index(isSolrCloud, collection, commitWithin,
contentStreamPath, requestParams, inputDocumentList);
+ inputDocumentList = new ArrayList<>();
+ }
+ index(isSolrCloud, collection, commitWithin,
contentStreamPath, requestParams, inputDocumentList);
+ }
+ } catch (SolrException e) {
+ error.set(e);
+ } catch (SolrServerException e) {
+ if (causedByIOException(e)) {
+ //Exit in case of a solr connection error
+ connectionError.set(e);
+ } else {
+ error.set(e);
+ }
+ } catch (IOException e) {
+ //Exit in case of a solr connection error
+ connectionError.set(e);
+ }
+ } catch (final IOException | SchemaNotFoundException |
MalformedRecordException e) {
+ logger.error("Could not parse incoming data", e);
--- End diff --
Change to getLogger().error
> Implement record-based Solr processors
> --------------------------------------
>
> Key: NIFI-4035
> URL: https://issues.apache.org/jira/browse/NIFI-4035
> Project: Apache NiFi
> Issue Type: Improvement
> Affects Versions: 1.2.0, 1.3.0
> Reporter: Bryan Bende
> Priority: Minor
>
> Now that we have record readers and writers, we should implement variants of
> the existing Solr processors that record-based...
> Processors to consider:
> * PutSolrRecord - uses a configured record reader to read an incoming flow
> file and insert records to Solr
> * GetSolrRecord - extracts records from Solr and uses a configured record
> writer to write them to a flow file
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)