Hi all,

I’m following up on this thread: I did some more testing, and actually the optimization problem was on our side.

The repository connector in question was CSV and the problem was that getMaxDocumentRequest() in CSVConnector.java was set to 1, so the processDocuments() method was processing documents one by one. I have now set it to 20 by default, and the performance has improved greatly.

Attached is modified class.

Regards,
Guylaine

France Labs – Your knowledge, now
Datafari Enterprise Search – Découvrez la version 5 / Discover our version 5
www.datafari.com <http://www.datafari.com>


On 2023/03/17 17:36:47 Julien Massiera wrote:
> Hi Karl
>
>
>
> I was debugging a repository connector because I was disappointed with the > performance, and I noticed that the processDocuments method is called each > time with only 1 document identifier instead of a heap, although the seeding
> phase has referenced 24k ids. What can explain that ? Can we have control
> over the amount of documentIdentifiers passed per processDocuments thread ? > For instance, assuming we have the perfect number of documents that an API
> can process at once, it would be very useful to be able to set it per
> thread.
>
>
>
> Other thing, I also noticed that the seed phase and the cleanup phase seem > to process documents per group of 100/200 at a time, again, is it configured
> somewhere, and can we have control over it ?
>
>
>
> Thanks,
>
> Julien
>
>
>
>
>
>
/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.manifoldcf.crawler.connectors.csv;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.manifoldcf.agents.interfaces.RepositoryDocument;
import org.apache.manifoldcf.agents.interfaces.ServiceInterruption;
import org.apache.manifoldcf.core.interfaces.ConfigParams;
import org.apache.manifoldcf.core.interfaces.IHTTPOutput;
import org.apache.manifoldcf.core.interfaces.IPostParameters;
import org.apache.manifoldcf.core.interfaces.ManifoldCFException;
import org.apache.manifoldcf.core.interfaces.Specification;
import org.apache.manifoldcf.core.interfaces.SpecificationNode;
import org.apache.manifoldcf.crawler.connectors.BaseRepositoryConnector;
import org.apache.manifoldcf.crawler.interfaces.IExistingVersions;
import org.apache.manifoldcf.crawler.interfaces.IProcessActivity;
import org.apache.manifoldcf.crawler.interfaces.ISeedingActivity;

public class CSVConnector extends BaseRepositoryConnector {

  private static final Logger LOGGER = LogManager.getLogger(CSVConnector.class.getName());
  private static Level DOCPROCESSLEVEL = Level.forName("DOCPROCESS", 450);
  private static final String EDIT_SPECIFICATION_JS = "editSpecification.js";
  private static final String EDIT_SPECIFICATION_CSV_HTML = "editSpecification_CSV.html";
  private static final String VIEW_SPECIFICATION_CSV_HTML = "viewSpecification_CSV.html";

  protected final static String ACTIVITY_READ = "read";
  private static final String DOCUMENT_ID_SEPARATOR = ";;";

  /**
   * Constructor.
   */
  public CSVConnector() {
  }

  @Override
  public int getMaxDocumentRequest() {
    return 20;
  }

  @Override
  public int getConnectorModel() {
    return CSVConnector.MODEL_ADD_CHANGE_DELETE;
  }

  @Override
  public String[] getActivitiesList() {
    return new String[] { ACTIVITY_READ };
  }

  /**
   * For any given document, list the bins that it is a member of.
   */
  @Override
  public String[] getBinNames(final String documentIdentifier) {
    // Return the host name
    return new String[] { "CSV" };
  }

  // All methods below this line will ONLY be called if a connect() call succeeded
  // on this instance!
  /**
   * Connect. The configuration parameters are included.
   *
   * @param configParams are the configuration parameters for this connection. Note well: There are no exceptions allowed from this call, since it is expected to mainly establish connection
   *                     parameters.
   */
  @Override
  public void connect(final ConfigParams configParams) {
    super.connect(configParams);

  }

  @Override
  public void disconnect() throws ManifoldCFException {
    super.disconnect();
  }

  @Override
  public String check() throws ManifoldCFException {
    return super.check();
  }

  @Override
  public String addSeedDocuments(final ISeedingActivity activities, final Specification spec, final String lastSeedVersion, final long seedTime, final int jobMode)
      throws ManifoldCFException, ServiceInterruption {

    long startTime;
    if (lastSeedVersion == null) {
      startTime = 0L;
    } else {
      // Unpack seed time from seed version string
      startTime = Long.parseLong(lastSeedVersion);
    }

    final CSVSpecs csvSpecs = new CSVSpecs(spec);
    final Map<String, String[]> csvMap = csvSpecs.getCSVMap();
    for (final String csvPath : csvMap.keySet()) {
      try {
        final long numberOfLines = CSVUtils.getCSVLinesNumber(csvPath);
        for (long i = 1L; i < numberOfLines; i++) {
          final String documentId = i + DOCUMENT_ID_SEPARATOR + csvPath;
          activities.addSeedDocument(documentId);
        }
      } catch (final IOException e) {
        throw new ManifoldCFException("Could not read CSV file " + csvPath + " : " + e.getMessage(), e);
      }
    }

    return String.valueOf(seedTime);

  }

  @Override
  public void processDocuments(final String[] documentIdentifiers, final IExistingVersions statuses, final Specification spec, final IProcessActivity activities, final int jobMode,
      final boolean usesDefaultAuthority) throws ManifoldCFException, ServiceInterruption {

    // Check if we should abort
    activities.checkJobStillActive();

    String errorCode = null;
    String description = "";
    final CSVSpecs csvSpecs = new CSVSpecs(spec);

    final long startFetchTime = System.currentTimeMillis();

    final String versionString = "";

    final Map<String, List<Long>> linesToReadPerDoc = new HashMap<>();

    for (final String documentIdentifier : documentIdentifiers) {
      LOGGER.log(DOCPROCESSLEVEL, "DOC_PROCESS_START|CSV|" + documentIdentifier);
      final String[] documentIdentifierArr = documentIdentifier.split(DOCUMENT_ID_SEPARATOR);
      final String lineToRead = documentIdentifierArr[0];
      final String docPath = documentIdentifierArr[1];
      if (linesToReadPerDoc.containsKey(docPath)) {
        linesToReadPerDoc.get(docPath).add(Long.parseLong(lineToRead));
      } else {
        final List<Long> linesToRead = new ArrayList<>();
        linesToRead.add(Long.parseLong(lineToRead));
        linesToReadPerDoc.put(docPath, linesToRead);
      }
    }

    for (final String docPath : linesToReadPerDoc.keySet()) {
      final String[] docLabels = csvSpecs.CSVMap.get(docPath);
      final Long[] linesToRead = linesToReadPerDoc.get(docPath).toArray(new Long[0]);
      // Sort lines to read so we can sequentially read the file
      Arrays.sort(linesToRead);
      final File csvFile = new File(docPath);
      long cptLine = 0;
      // init cptLinesToRead
      int cptLinesToRead = 0;
      // init lineToRead
      long lineToRead = linesToRead[cptLinesToRead];
      try (FileReader fr = new FileReader(csvFile); BufferedReader br = new BufferedReader(fr);) {
        String line = null;
        while ((line = br.readLine()) != null) {
          if (cptLine < lineToRead) {
            cptLine++;
            continue;
          }

          // Rebuild documentIdentifier (MCF id)
          final String documentIdentifier = lineToRead + DOCUMENT_ID_SEPARATOR + docPath;
          String ingestId = String.valueOf(lineToRead);
          final RepositoryDocument rd = new RepositoryDocument();
          byte[] contentBytes = null;
          final String[] values = line.split(csvSpecs.getSeparator());
          for (int i = 0; i < values.length; i++) {
            final String value = values[i];
            final String label = docLabels[i];
            if (label.contentEquals(csvSpecs.getContentColumnLabel())) {
              contentBytes = value.getBytes();
            } else {
              if (label.contentEquals(csvSpecs.getIdColumnLabel())) {
                ingestId = value;
              }
              rd.addField(label, value);
            }
          }

          if (versionString.length() == 0 || activities.checkDocumentNeedsReindexing(documentIdentifier, versionString)) {
            // Ingest document
            try (ByteArrayInputStream inputStream = new ByteArrayInputStream(contentBytes)) {
              rd.setBinary(inputStream, contentBytes.length);
              activities.ingestDocumentWithException(documentIdentifier, versionString, ingestId, rd);
              errorCode = "OK";
              activities.recordActivity(startFetchTime, ACTIVITY_READ, (long) contentBytes.length, documentIdentifier, errorCode, description, null);
            } finally {
              LOGGER.log(DOCPROCESSLEVEL, "DOC_PROCESS_END|CSV|" + documentIdentifier);
            }
          }

          // We just read a line to read, so search for the next line to read
          cptLinesToRead++;
          // If there is still line to read, then set lineToRead with the new value, otherwise we read all wanted lines so we can close the stream;
          if (cptLinesToRead < linesToRead.length) {
            lineToRead = linesToRead[cptLinesToRead];
          } else {
            // We have read all the linesToRead so we can stop reading the stream
            break;
          }
          cptLine++;
        }
      } catch (final IOException e) {
        errorCode = "KO";
        description = "Unable to read file " + docPath + " : " + e.getMessage();
        // Rebuild documentIdentifier (MCF id)
        final String documentIdentifier = lineToRead + DOCUMENT_ID_SEPARATOR + docPath;
        activities.recordActivity(startFetchTime, ACTIVITY_READ, 0L, documentIdentifier, errorCode, description, null);
        LOGGER.error(description, e);
      }
    }

  }

  @Override
  public String processSpecificationPost(final IPostParameters variableContext, final Locale locale, final Specification os, final int connectionSequenceNumber) throws ManifoldCFException {
    final String seqPrefix = "s" + connectionSequenceNumber + "_";

    String x;

    x = variableContext.getParameter(seqPrefix + "filepath_count");
    if (x != null && x.length() > 0) {
      // About to gather the filepath nodes, so get rid of the old ones.
      int i = 0;
      while (i < os.getChildCount()) {
        final SpecificationNode node = os.getChild(i);
        if (node.getType().equals(CSVConfig.NODE_FILEPATH)) {
          os.removeChild(i);
        } else {
          i++;
        }
      }
      final int count = Integer.parseInt(x);
      i = 0;
      while (i < count) {
        final String prefix = seqPrefix + "filepath_";
        final String suffix = "_" + Integer.toString(i);
        final String op = variableContext.getParameter(prefix + "op" + suffix);
        if (op == null || !op.equals("Delete")) {
          // Gather the includefilters etc.
          final String value = variableContext.getParameter(prefix + CSVConfig.ATTRIBUTE_VALUE + suffix);
          final SpecificationNode node = new SpecificationNode(CSVConfig.NODE_FILEPATH);
          node.setAttribute(CSVConfig.ATTRIBUTE_VALUE, value);
          os.addChild(os.getChildCount(), node);
        }
        i++;
      }

      final String addop = variableContext.getParameter(seqPrefix + "filepath_op");
      if (addop != null && addop.equals("Add")) {
        final String regex = variableContext.getParameter(seqPrefix + "filepath_value");
        final SpecificationNode node = new SpecificationNode(CSVConfig.NODE_FILEPATH);
        node.setAttribute(CSVConfig.ATTRIBUTE_VALUE, regex);
        os.addChild(os.getChildCount(), node);
      }
    }

    x = variableContext.getParameter(seqPrefix + CSVConfig.NODE_ID_COLUMN);
    if (x != null) {
      // Delete id column entry
      int i = 0;
      while (i < os.getChildCount()) {
        final SpecificationNode sn = os.getChild(i);
        if (sn.getType().equals(CSVConfig.NODE_ID_COLUMN)) {
          os.removeChild(i);
        } else {
          i++;
        }
      }
      if (x.length() > 0) {
        final SpecificationNode node = new SpecificationNode(CSVConfig.NODE_ID_COLUMN);
        node.setAttribute(CSVConfig.ATTRIBUTE_VALUE, x);
        os.addChild(os.getChildCount(), node);
      }
    }

    x = variableContext.getParameter(seqPrefix + CSVConfig.NODE_CONTENT_COLUMN);
    if (x != null) {
      // Delete content column entry
      int i = 0;
      while (i < os.getChildCount()) {
        final SpecificationNode sn = os.getChild(i);
        if (sn.getType().equals(CSVConfig.NODE_CONTENT_COLUMN)) {
          os.removeChild(i);
        } else {
          i++;
        }
      }
      if (x.length() > 0) {
        final SpecificationNode node = new SpecificationNode(CSVConfig.NODE_CONTENT_COLUMN);
        node.setAttribute(CSVConfig.ATTRIBUTE_VALUE, x);
        os.addChild(os.getChildCount(), node);
      }
    }

    x = variableContext.getParameter(seqPrefix + CSVConfig.NODE_SEPARATOR);
    if (x != null) {
      // Delete separator entry
      int i = 0;
      while (i < os.getChildCount()) {
        final SpecificationNode sn = os.getChild(i);
        if (sn.getType().equals(CSVConfig.NODE_SEPARATOR)) {
          os.removeChild(i);
        } else {
          i++;
        }
      }
      if (x.length() > 0) {
        final SpecificationNode node = new SpecificationNode(CSVConfig.NODE_SEPARATOR);
        node.setAttribute(CSVConfig.ATTRIBUTE_VALUE, x);
        os.addChild(os.getChildCount(), node);
      }
    }

    return null;
  }

  /**
   * Output the specification header section. This method is called in the head section of a job page which has selected a pipeline connection of the current type. Its purpose is to add the required
   * tabs to the list, and to output any javascript methods that might be needed by the job editing HTML.
   *
   * @param out                      is the output to which any HTML should be sent.
   * @param locale                   is the preferred local of the output.
   * @param os                       is the current pipeline specification for this connection.
   * @param connectionSequenceNumber is the unique number of this connection within the job.
   * @param tabsArray                is an array of tab names. Add to this array any tab names that are specific to the connector.
   */
  @Override
  public void outputSpecificationHeader(final IHTTPOutput out, final Locale locale, final Specification os, final int connectionSequenceNumber, final List<String> tabsArray)
      throws ManifoldCFException, IOException {
    final Map<String, Object> paramMap = new HashMap<>();
    paramMap.put("SEQNUM", Integer.toString(connectionSequenceNumber));

    tabsArray.add(Messages.getString(locale, "CSV.CSVTabName"));

    Messages.outputResourceWithVelocity(out, locale, EDIT_SPECIFICATION_JS, paramMap);
  }

  /**
   * Output the specification body section. This method is called in the body section of a job page which has selected a pipeline connection of the current type. Its purpose is to present the required
   * form elements for editing. The coder can presume that the HTML that is output from this configuration will be within appropriate <html>, <body>, and <form> tags. The name of the form is
   * "editjob".
   *
   * @param out                      is the output to which any HTML should be sent.
   * @param locale                   is the preferred local of the output.
   * @param os                       is the current pipeline specification for this job.
   * @param connectionSequenceNumber is the unique number of this connection within the job.
   * @param actualSequenceNumber     is the connection within the job that has currently been selected.
   * @param tabName                  is the current tab name.
   */
  @Override
  public void outputSpecificationBody(final IHTTPOutput out, final Locale locale, final Specification os, final int connectionSequenceNumber, final int actualSequenceNumber, final String tabName)
      throws ManifoldCFException, IOException {
    final Map<String, Object> paramMap = new HashMap<>();

    // Set the tab name
    paramMap.put("TABNAME", tabName);
    paramMap.put("SEQNUM", Integer.toString(connectionSequenceNumber));
    paramMap.put("SELECTEDNUM", Integer.toString(actualSequenceNumber));

    // Fill in the field mapping tab data
    fillInCSVSpecificationMap(paramMap, os);
    // fillInSecuritySpecificationMap(paramMap, os);

    Messages.outputResourceWithVelocity(out, locale, EDIT_SPECIFICATION_CSV_HTML, paramMap);
  }

  /**
   * View specification. This method is called in the body section of a job's view page. Its purpose is to present the pipeline specification information to the user. The coder can presume that the
   * HTML that is output from this configuration will be within appropriate <html> and <body> tags.
   *
   * @param out                      is the output to which any HTML should be sent.
   * @param locale                   is the preferred local of the output.
   * @param connectionSequenceNumber is the unique number of this connection within the job.
   * @param os                       is the current pipeline specification for this job.
   */

  @Override
  public void viewSpecification(final IHTTPOutput out, final Locale locale, final Specification os, final int connectionSequenceNumber) throws ManifoldCFException, IOException {
    final Map<String, Object> paramMap = new HashMap<>();
    paramMap.put("SEQNUM", Integer.toString(connectionSequenceNumber));

    // Fill in the map with data from all tabs
    fillInCSVSpecificationMap(paramMap, os);

    Messages.outputResourceWithVelocity(out, locale, VIEW_SPECIFICATION_CSV_HTML, paramMap);

  }

  private void fillInCSVSpecificationMap(final Map<String, Object> paramMap, final Specification os) {

    final List<String> filePaths = new ArrayList<>();
    String contentColumn = "content";
    String idColumn = "id";
    String separator = ",";

    for (int i = 0; i < os.getChildCount(); i++) {
      final SpecificationNode sn = os.getChild(i);

      if (sn.getType().equals(CSVConfig.NODE_FILEPATH)) {
        final String includeFileFilter = sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE);
        if (includeFileFilter != null) {
          filePaths.add(includeFileFilter);
        }
      } else if (sn.getType().equals(CSVConfig.NODE_ID_COLUMN)) {
        if (sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE) != null) {
          idColumn = sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE);
        }
      } else if (sn.getType().equals(CSVConfig.NODE_CONTENT_COLUMN)) {
        if (sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE) != null) {
          contentColumn = sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE);
        }
      } else if (sn.getType().equals(CSVConfig.NODE_SEPARATOR)) {
        if (sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE) != null) {
          separator = sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE);
        }
      }
    }

    paramMap.put("FILEPATHS", filePaths);
    paramMap.put("CONTENTCOLUMN", contentColumn);
    paramMap.put("IDCOLUMN", idColumn);
    paramMap.put("SEPARATOR", separator);
  }

  private static class CSVSpecs {

    private final Map<String, String[]> CSVMap = new HashMap<>();
    private String contentColumnLabel = "content";
    private final String idColumnLabel = "id";
    private String separator;

    public CSVSpecs(final Specification os) {

      final List<String> csvFiles = new ArrayList<>();

      for (int i = 0; i < os.getChildCount(); i++) {
        final SpecificationNode sn = os.getChild(i);

        if (sn.getType().equals(CSVConfig.NODE_FILEPATH)) {
          final String value = sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE);
          csvFiles.add(value);
        } else if (sn.getType().equals(CSVConfig.NODE_CONTENT_COLUMN)) {
          contentColumnLabel = sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE);
        } else if (sn.getType().equals(CSVConfig.NODE_SEPARATOR)) {
          separator = sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE);
        } else if (sn.getType().equals(CSVConfig.NODE_ID_COLUMN)) {
          separator = sn.getAttributeValue(CSVConfig.ATTRIBUTE_VALUE);
        }

      }

      for (final String csvFilePath : csvFiles) {
        try {
          final String[] columnsLabel = CSVUtils.getColumnsLabel(csvFilePath, separator);
          CSVMap.put(csvFilePath, columnsLabel);
        } catch (final IOException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      }

    }

    public Map<String, String[]> getCSVMap() {
      return CSVMap;
    }

    public String getContentColumnLabel() {
      return contentColumnLabel;
    }

    public String getIdColumnLabel() {
      return idColumnLabel;
    }

    public String getSeparator() {
      return separator;
    }

  }
}

Reply via email to