This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 134bbf8  Add more solr support to tika pipes (#445)
134bbf8 is described below

commit 134bbf876dcf39eb50fb6a456893d0e2cb228c17
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Mon May 17 11:37:07 2021 -0500

    Add more solr support to tika pipes (#445)
    
    * Add solr pipe iterator
    Add solr emitter
    Add end-to-end tika async test with solr pipe iterator, file system 
fetcher, and solr emitter.
    
    * fix styles
    
    * resolve some enforcer findings
---
 .../java/org/apache/tika/pipes/PipesClient.java    |   2 +
 tika-parent/pom.xml                                |   2 +
 tika-pipes/tika-emitters/tika-emitter-solr/pom.xml |  35 ++-
 .../tika/pipes/emitter/solr/SolrEmitter.java       | 325 +++++++++++----------
 .../apache/tika/pipes/emitter/solr/TestBasic.java  |  94 ------
 tika-pipes/tika-pipes-integration-tests/pom.xml    |  30 ++
 .../tika/pipes/solrtest/TikaPipesSolr6Test.java    |  46 +++
 .../tika/pipes/solrtest/TikaPipesSolr7Test.java    |  47 +++
 .../tika/pipes/solrtest/TikaPipesSolr8Test.java    |  46 +++
 .../tika/pipes/solrtest/TikaPipesSolrTestBase.java | 155 ++++++++++
 .../src/test/resources/tika-async-log4j.properties |  13 +
 .../src/test/resources/tika-config-solr-urls.xml   | 103 +++++++
 tika-pipes/tika-pipes-iterators/pom.xml            |   1 +
 .../tika-pipes-iterator-solr}/pom.xml              |  60 +++-
 .../tika/pipes/solrtest/SolrPipesIterator.java     | 264 +++++++++++++++++
 15 files changed, 970 insertions(+), 253 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java 
b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index ac46270..2db98b2 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -342,6 +342,8 @@ public class PipesClient implements Closeable {
                     SERVER_LOG.warn(line.substring(5));
                 } else if (line.startsWith("error ")) {
                     SERVER_LOG.error(line.substring(6));
+                } else {
+                    SERVER_LOG.error(line);
                 }
                 try {
                     line = reader.readLine();
diff --git a/tika-parent/pom.xml b/tika-parent/pom.xml
index 66b5b99..4c5e38c 100644
--- a/tika-parent/pom.xml
+++ b/tika-parent/pom.xml
@@ -347,9 +347,11 @@
     <sis.version>1.0</sis.version>
     <!-- we'll need to stay on 1.7 until we're java modularized ? -->
     <slf4j.version>1.7.30</slf4j.version>
+    <solrj.version>8.8.2</solrj.version>
     <spring.version>5.3.3</spring.version>
     <sqlite.version>3.34.0</sqlite.version>
     <tagsoup.version>1.2.1</tagsoup.version>
+    <test.containers.version>1.15.2</test.containers.version>
     <!-- NOTE: sync tukaani version with commons-compress in tika-parent-->
     <tukaani.version>1.8</tukaani.version>
     <twelvemonkeys.version>3.6.1</twelvemonkeys.version>
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml 
b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
index 074152e..6d0d485 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
@@ -51,6 +51,35 @@
       <artifactId>log4j-slf4j-impl</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-io</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-http</artifactId>
+      <version>${jetty.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-solrj</artifactId>
+      <version>${solrj.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-io</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.eclipse.jetty</groupId>
+          <artifactId>jetty-http</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
@@ -103,17 +132,17 @@
               </filters>
               <transformers>
                 <transformer
-                    
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                        
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                   <resource>META-INF/LICENSE</resource>
                   <file>target/classes/META-INF/LICENSE</file>
                 </transformer>
                 <transformer
-                    
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                        
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                   <resource>META-INF/NOTICE</resource>
                   <file>target/classes/META-INF/NOTICE</file>
                 </transformer>
                 <transformer
-                    
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+                        
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                   <resource>META-INF/DEPENDENCIES</resource>
                   <file>target/classes/META-INF/DEPENDENCIES</file>
                 </transformer>
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 0d9cd35..78eca0d 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -16,26 +16,22 @@
  */
 package org.apache.tika.pipes.emitter.solr;
 
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
 import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
-import java.util.zip.GZIPOutputStream;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.http.client.HttpClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
 import org.apache.tika.client.HttpClientFactory;
-import org.apache.tika.client.HttpClientUtil;
-import org.apache.tika.client.TikaClientException;
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
@@ -45,87 +41,72 @@ import org.apache.tika.metadata.Metadata;
 import org.apache.tika.pipes.emitter.AbstractEmitter;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SolrEmitter extends AbstractEmitter implements Initializable {
 
-    private static final String ATTACHMENTS = "attachments";
-    private static final String UPDATE_PATH = "/update";
+    public enum AttachmentStrategy {
+        SKIP,
+        CONCATENATE_CONTENT,
+        PARENT_CHILD,
+        //anything else?
+    }
+
+    public enum UpdateStrategy {
+        ADD,
+        UPDATE_MUST_EXIST,
+        UPDATE_MUST_NOT_EXIST,
+    }
+
     private static final Logger LOG = 
LoggerFactory.getLogger(SolrEmitter.class);
-    //one day this will be allowed or can be configured?
-    private final boolean gzipJson = false;
+
     private AttachmentStrategy attachmentStrategy = 
AttachmentStrategy.PARENT_CHILD;
-    private String url;
+    private UpdateStrategy updateStrategy = UpdateStrategy.ADD;
+    private String solrCollection;
+    /**
+     * You can specify solrUrls, or you can specify solrZkHosts and use use 
zookeeper to determine the solr server urls.
+     */
+    private List<String> solrUrls;
+    private List<String> solrZkHosts;
+    private String solrZkChroot;
     private String contentField = "content";
     private String idField = "id";
-    private int commitWithin = 100;
-    private HttpClientFactory httpClientFactory;
-    private HttpClient httpClient;
+    private int commitWithin = 1000;
+    private int connectionTimeout = 10000;
+    private int socketTimeout = 60000;
+    private final HttpClientFactory httpClientFactory;
+    private SolrClient solrClient;
+
     public SolrEmitter() throws TikaConfigException {
         httpClientFactory = new HttpClientFactory();
     }
 
     @Override
-    public void emit(String emitKey, List<Metadata> metadataList)
-            throws IOException, TikaEmitterException {
-
+    public void emit(String emitKey, List<Metadata> metadataList) throws 
IOException,
+            TikaEmitterException {
         if (metadataList == null || metadataList.size() == 0) {
             LOG.warn("metadataList is null or empty");
             return;
         }
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        Writer writer = gzipJson ? new BufferedWriter(
-                new OutputStreamWriter(new GZIPOutputStream(bos), 
StandardCharsets.UTF_8)) :
-                new BufferedWriter(new OutputStreamWriter(bos, 
StandardCharsets.UTF_8));
-        try (JsonGenerator jsonGenerator = new 
JsonFactory().createGenerator(writer)) {
-            jsonGenerator.writeStartArray();
-            jsonify(jsonGenerator, emitKey, metadataList);
-            jsonGenerator.writeEndArray();
-        }
-        LOG.debug("emitting json ({})", new String(bos.toByteArray(), 
StandardCharsets.UTF_8));
-        try {
-            HttpClientUtil
-                    .postJson(httpClient, url + UPDATE_PATH +
-                                    "?commitWithin=" + getCommitWithin(),
-                            bos.toByteArray(), gzipJson);
-        } catch (TikaClientException e) {
-            throw new TikaEmitterException("can't post", e);
-        }
+        List<SolrInputDocument> docsToUpdate = new ArrayList<>();
+        addMetadataAsSolrInputDocuments(emitKey, metadataList, docsToUpdate);
+        emitSolrBatch(docsToUpdate);
     }
 
-    @Override
-    public void emit(List<? extends EmitData> batch) throws IOException, 
TikaEmitterException {
-        if (batch == null || batch.size() == 0) {
-            LOG.warn("batch is null or empty");
-            return;
-        }
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        Writer writer = gzipJson ? new BufferedWriter(
-                new OutputStreamWriter(new GZIPOutputStream(bos), 
StandardCharsets.UTF_8)) :
-                new BufferedWriter(new OutputStreamWriter(bos, 
StandardCharsets.UTF_8));
-        try (JsonGenerator jsonGenerator = new 
JsonFactory().createGenerator(writer)) {
-            jsonGenerator.writeStartArray();
-            for (EmitData d : batch) {
-                jsonify(jsonGenerator, d.getEmitKey().getEmitKey(), 
d.getMetadataList());
-            }
-            jsonGenerator.writeEndArray();
-        }
-        LOG.debug("emitting json ({})", new String(bos.toByteArray(), 
StandardCharsets.UTF_8));
-        try {
-            HttpClientUtil
-                    .postJson(httpClient, url + UPDATE_PATH +
-                                    "?commitWithin=" + getCommitWithin(),
-                            bos.toByteArray(), gzipJson);
-        } catch (TikaClientException e) {
-            throw new TikaEmitterException("can't post", e);
+    private void addMetadataAsSolrInputDocuments(String emitKey, 
List<Metadata> metadataList,
+                                                 List<SolrInputDocument> 
docsToUpdate)
+            throws IOException, TikaEmitterException {
+        SolrInputDocument solrInputDocument = new SolrInputDocument();
+        solrInputDocument.setField(idField, emitKey);
+        if (updateStrategy == UpdateStrategy.UPDATE_MUST_EXIST) {
+            solrInputDocument.setField("_version_", 1);
+        } else if (updateStrategy == UpdateStrategy.UPDATE_MUST_NOT_EXIST) {
+            solrInputDocument.setField("_version_", -1);
         }
-    }
-
-    private void jsonify(JsonGenerator jsonGenerator, String emitKey,
-                         List<Metadata> metadataList)
-            throws IOException {
-        metadataList.get(0).set(idField, emitKey);
-        if (attachmentStrategy == AttachmentStrategy.SKIP || 
metadataList.size() == 1) {
-            jsonify(metadataList.get(0), jsonGenerator);
+        if (attachmentStrategy == AttachmentStrategy.SKIP ||
+                metadataList.size() == 1) {
+            addMetadataToSolrInputDocument(metadataList.get(0), 
solrInputDocument, updateStrategy);
         } else if (attachmentStrategy == 
AttachmentStrategy.CONCATENATE_CONTENT) {
             //this only handles text for now, not xhtml
             StringBuilder sb = new StringBuilder();
@@ -137,95 +118,106 @@ public class SolrEmitter extends AbstractEmitter 
implements Initializable {
             }
             Metadata parent = metadataList.get(0);
             parent.set(getContentField(), sb.toString());
-            jsonify(parent, jsonGenerator);
+            addMetadataToSolrInputDocument(parent, solrInputDocument, 
updateStrategy);
         } else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
-            jsonify(metadataList.get(0), jsonGenerator, false);
-            jsonGenerator.writeArrayFieldStart(ATTACHMENTS);
-
+            addMetadataToSolrInputDocument(metadataList.get(0), 
solrInputDocument, updateStrategy);
             for (int i = 1; i < metadataList.size(); i++) {
+                SolrInputDocument childSolrInputDocument = new 
SolrInputDocument();
                 Metadata m = metadataList.get(i);
-                m.set(idField, UUID.randomUUID().toString());
-                jsonify(m, jsonGenerator);
+                childSolrInputDocument.setField(idField, 
UUID.randomUUID().toString());
+                addMetadataToSolrInputDocument(m, childSolrInputDocument, 
updateStrategy);
             }
-            jsonGenerator.writeEndArray();
-            jsonGenerator.writeEndObject();
         } else {
-            throw new IllegalArgumentException(
-                    "I don't yet support this attachment strategy: " + 
attachmentStrategy);
+            throw new IllegalArgumentException("I don't yet support this 
attachment strategy: "
+                    + attachmentStrategy);
         }
+        docsToUpdate.add(solrInputDocument);
     }
 
-    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator, 
boolean writeEndObject)
-            throws IOException {
-        jsonGenerator.writeStartObject();
+    @Override
+    public void emit(List<? extends EmitData> batch) throws IOException, 
TikaEmitterException {
+        if (batch == null || batch.size() == 0) {
+            LOG.warn("batch is null or empty");
+            return;
+        }
+        List<SolrInputDocument> docsToUpdate = new ArrayList<>();
+        for (EmitData d : batch) {
+            addMetadataAsSolrInputDocuments(d.getEmitKey().getEmitKey(), 
d.getMetadataList(), docsToUpdate);
+        }
+        emitSolrBatch(docsToUpdate);
+    }
+
+    private void emitSolrBatch(List<SolrInputDocument> docsToUpdate) throws 
IOException, TikaEmitterException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Emitting solr doc batch: {}", docsToUpdate);
+        }
+        if (!docsToUpdate.isEmpty()) {
+            try {
+                UpdateRequest req = new UpdateRequest();
+                req.add(docsToUpdate);
+                req.setCommitWithin(commitWithin);
+                req.setParam("failOnVersionConflicts", "false");
+                req.process(solrClient, solrCollection);
+            } catch (Exception e) {
+                throw new TikaEmitterException("Could not add batch to solr", 
e);
+            }
+        }
+    }
+
+    private void addMetadataToSolrInputDocument(Metadata metadata, 
SolrInputDocument solrInputDocument,
+                                                UpdateStrategy updateStrategy) 
{
         for (String n : metadata.names()) {
             String[] vals = metadata.getValues(n);
             if (vals.length == 0) {
                 continue;
             } else if (vals.length == 1) {
-                jsonGenerator.writeStringField(n, vals[0]);
+                if (updateStrategy == UpdateStrategy.ADD) {
+                    solrInputDocument.setField(n, vals[0]);
+                } else {
+                    solrInputDocument.setField(n, new HashMap<String, 
String>() {{
+                        put("set", vals[0]);
+                    }});
+                }
             } else if (vals.length > 1) {
-                jsonGenerator.writeArrayFieldStart(n);
-                for (String val : vals) {
-                    jsonGenerator.writeString(val);
+                if (updateStrategy == UpdateStrategy.ADD) {
+                    solrInputDocument.setField(n, vals);
+                } else {
+                    solrInputDocument.setField(n, new HashMap<String, 
String[]>() {{
+                        put("set", vals);
+                    }});
                 }
-                jsonGenerator.writeEndArray();
             }
         }
-        if (writeEndObject) {
-            jsonGenerator.writeEndObject();
-        }
-    }
-
-    private void jsonify(Metadata metadata, JsonGenerator jsonGenerator) 
throws IOException {
-        jsonify(metadata, jsonGenerator, true);
     }
 
     /**
-     * Options: "skip", "concatenate-content", "parent-child". Default is 
"parent-child".
-     * If set to "skip", this will index only the main file and ignore all info
-     * in the attachments.  If set to "concatenate", this will concatenate the
+     * Options: SKIP, CONCATENATE_CONTENT, PARENT_CHILD. Default is 
"PARENT_CHILD".
+     * If set to "SKIP", this will index only the main file and ignore all info
+     * in the attachments.  If set to "CONCATENATE_CONTENT", this will 
concatenate the
      * content extracted from the attachments into the main document and
      * then index the main document with the concatenated content _and_ the
      * main document's metadata (metadata from attachments will be thrown 
away).
-     * If set to "parent-child", this will index the attachments as children
+     * If set to "PARENT_CHILD", this will index the attachments as children
      * of the parent document via Solr's parent-child relationship.
-     *
-     * @param attachmentStrategy
      */
     @Field
     public void setAttachmentStrategy(String attachmentStrategy) {
-        switch (attachmentStrategy) {
-            case "skip":
-                this.attachmentStrategy = AttachmentStrategy.SKIP;
-                break;
-            case "concatenate-content":
-                this.attachmentStrategy = 
AttachmentStrategy.CONCATENATE_CONTENT;
-                break;
-            case "parent-child":
-                this.attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
-                break;
-            default:
-                throw new IllegalArgumentException("Expected 'skip', 
'concatenate-content' or " +
-                    "'parent-child'. I regret I do not recognize: " + 
attachmentStrategy);
-        }
+        this.attachmentStrategy = 
AttachmentStrategy.valueOf(attachmentStrategy);
     }
 
-    /**
-     * Specify the url for Solr
-     *
-     * @param url
-     */
     @Field
-    public void setUrl(String url) {
-        if (url.endsWith("/")) {
-            url = url.substring(0, url.length() - 1);
-        }
-        this.url = url;
+    public void setUpdateStrategy(String updateStrategy) {
+        this.updateStrategy = UpdateStrategy.valueOf(updateStrategy);
     }
 
-    public String getContentField() {
-        return contentField;
+    @Field
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    @Field
+    public void setSocketTimeout(int socketTimeout) {
+        this.socketTimeout = socketTimeout;
     }
 
     /**
@@ -242,8 +234,8 @@ public class SolrEmitter extends AbstractEmitter implements 
Initializable {
         this.contentField = contentField;
     }
 
-    public int getCommitWithin() {
-        return commitWithin;
+    public String getContentField() {
+        return contentField;
     }
 
     @Field
@@ -251,6 +243,10 @@ public class SolrEmitter extends AbstractEmitter 
implements Initializable {
         this.commitWithin = commitWithin;
     }
 
+    public int getCommitWithin() {
+        return commitWithin;
+    }
+
     /**
      * Specify the field in the first Metadata that should be
      * used as the id field for the document.
@@ -262,7 +258,27 @@ public class SolrEmitter extends AbstractEmitter 
implements Initializable {
         this.idField = idField;
     }
 
-    //TODO -- add other httpclient configurations
+    @Field
+    public void setSolrCollection(String solrCollection) {
+        this.solrCollection = solrCollection;
+    }
+
+    @Field
+    public void setSolrUrls(List<String> solrUrls) {
+        this.solrUrls = solrUrls;
+    }
+
+    @Field
+    public void setSolrZkHosts(List<String> solrZkHosts) {
+        this.solrZkHosts = solrZkHosts;
+    }
+
+    @Field
+    public void setSolrZkChroot(String solrZkChroot) {
+        this.solrZkChroot = solrZkChroot;
+    }
+
+    //TODO -- add other httpclient configurations??
     @Field
     public void setUserName(String userName) {
         httpClientFactory.setUserName(userName);
@@ -290,19 +306,34 @@ public class SolrEmitter extends AbstractEmitter 
implements Initializable {
 
     @Override
     public void initialize(Map<String, Param> params) throws 
TikaConfigException {
-        //TODO: build the client here?
-        httpClient = httpClientFactory.build();
+        if (solrUrls == null || solrUrls.isEmpty()) {
+            solrClient = new CloudSolrClient.Builder(solrZkHosts, 
Optional.ofNullable(solrZkChroot))
+                    .withConnectionTimeout(connectionTimeout)
+                    .withSocketTimeout(socketTimeout)
+                    .withHttpClient(httpClientFactory.build())
+                    .build();
+        } else {
+            solrClient = new LBHttpSolrClient.Builder()
+                    .withConnectionTimeout(connectionTimeout)
+                    .withSocketTimeout(socketTimeout)
+                    .withHttpClient(httpClientFactory.build())
+                    .withBaseSolrUrls(solrUrls.toArray(new String[] 
{})).build();
+        }
     }
 
     @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler)
-            throws TikaConfigException {
-
-    }
-
-    enum AttachmentStrategy {
-        SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
-        //anything else?
+    public void checkInitialization(InitializableProblemHandler 
problemHandler) throws TikaConfigException {
+        mustNotBeEmpty("solrCollection", this.solrCollection);
+        mustNotBeEmpty("urlFieldName", this.idField);
+        if ((this.solrUrls == null || this.solrUrls.isEmpty()) &&
+                (this.solrZkHosts == null || this.solrZkHosts.isEmpty())) {
+            throw new IllegalArgumentException(
+                    "expected either param solrUrls or param solrZkHosts, but 
neither was specified");
+        }
+        if (this.solrUrls != null && !this.solrUrls.isEmpty() && 
this.solrZkHosts != null &&
+                !this.solrZkHosts.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "expected either param solrUrls or param solrZkHosts, but 
both were specified");
+        }
     }
-
 }
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
 
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
deleted file mode 100644
index d88947e..0000000
--- 
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.emitter.solr;
-
-
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.filter.MetadataFilter;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-
-@Ignore("requires solr to be up and running; please dockerize some tests, 
please, please")
-public class TestBasic {
-
-    @Test
-    public void testBasic() throws Exception {
-        TikaConfig tikaConfig = new TikaConfig(
-                
TestBasic.class.getResourceAsStream("/tika-config-simple-emitter.xml"));
-        EmitterManager emitterManager = EmitterManager.load(
-                
Paths.get(TestBasic.class.getResource("/tika-config-simple-emitter.xml").toURI())
-        );
-        Emitter emitter = emitterManager.getEmitter("solr1");
-        List<Metadata> metadataList = getParentChild(tikaConfig, "id1", 2);
-
-        emitter.emit("1", metadataList);
-    }
-
-    @Test
-    public void testBatch() throws Exception {
-        TikaConfig tikaConfig = new TikaConfig(
-                
TestBasic.class.getResourceAsStream("/tika-config-simple-emitter.xml"));
-        EmitterManager emitterManager = EmitterManager.load(
-                
Paths.get(TestBasic.class.getResource("/tika-config-simple-emitter.xml").toURI())
-        );
-        Emitter emitter = emitterManager.getEmitter("solr2");
-        List<EmitData> emitData = new ArrayList<>();
-        for (int i = 0; i < 100; i++) {
-            List<Metadata> metadataList = getParentChild(tikaConfig, "batch_" 
+ i, 4);
-            emitData.add(new EmitData(new EmitKey(emitter.getName(),
-                    "batch_" + i), metadataList));
-        }
-        emitter.emit(emitData);
-    }
-
-    private List<Metadata> getParentChild(TikaConfig tikaConfig, String id, 
int numChildren)
-            throws TikaException {
-        List<Metadata> metadataList = new ArrayList<>();
-        MetadataFilter filter = tikaConfig.getMetadataFilter();
-
-        Metadata m1 = new Metadata();
-        m1.set("id", id);
-        m1.set(Metadata.CONTENT_LENGTH, "314159");
-        m1.set(TikaCoreProperties.TIKA_CONTENT, "the quick brown");
-        m1.set(TikaCoreProperties.TITLE, "this is the first title");
-        m1.add(TikaCoreProperties.CREATOR, "firstAuthor");
-        m1.add(TikaCoreProperties.CREATOR, "secondAuthor");
-        filter.filter(m1);
-        metadataList.add(m1);
-        for (int i = 1; i < numChildren; i++) {
-            Metadata m2 = new Metadata();
-            m2.set(TikaCoreProperties.EMBEDDED_RESOURCE_PATH, 
"/path_to_this.txt");
-            m2.set(TikaCoreProperties.TIKA_CONTENT, "fox jumped over the lazy 
" + i);
-            filter.filter(m2);
-            metadataList.add(m2);
-        }
-        return metadataList;
-    }
-
-}
diff --git a/tika-pipes/tika-pipes-integration-tests/pom.xml 
b/tika-pipes/tika-pipes-integration-tests/pom.xml
index fbaef59..b4c4036 100644
--- a/tika-pipes/tika-pipes-integration-tests/pom.xml
+++ b/tika-pipes/tika-pipes-integration-tests/pom.xml
@@ -71,6 +71,36 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.testcontainers</groupId>
+      <artifactId>testcontainers</artifactId>
+      <version>${test.containers.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-pipes-iterator-solr</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-emitter-solr</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-app</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-solrj</artifactId>
+      <version>${solrj.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr6Test.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr6Test.java
new file mode 100644
index 0000000..12e9ac7
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr6Test.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.solrtest;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class TikaPipesSolr6Test extends TikaPipesSolrTestBase {
+
+    @Rule
+    public GenericContainer<?> solr6 = new 
GenericContainer<>(DockerImageName.parse("solr:6"))
+            .withExposedPorts(8983, 9983)
+            .withCommand("-DzkRun");
+
+    @Before
+    public void setupTest() throws Exception {
+        setupSolr(solr6);
+    }
+
+    @Test
+    public void testFetchIteratorWithSolrUrls() throws Exception {
+        runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(false);
+    }
+
+    @Test
+    public void testFetchIteratorWithZkHost() throws Exception {
+        runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(true);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr7Test.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr7Test.java
new file mode 100644
index 0000000..c9cf566
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr7Test.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.solrtest;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class TikaPipesSolr7Test extends TikaPipesSolrTestBase {
+
+    @Rule
+    public GenericContainer<?> solr7 = new 
GenericContainer<>(DockerImageName.parse("solr:7"))
+            .withExposedPorts(8983, 9983)
+            .withCommand("-DzkRun");
+
+    @Before
+    public void setupTest() throws Exception {
+        setupSolr(solr7);
+    }
+
+    @Test
+    public void testFetchIteratorWithSolrUrls() throws Exception {
+        runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(false);
+
+    }
+
+    @Test
+    public void testFetchIteratorWithZkHost() throws Exception {
+        runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(true);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr8Test.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr8Test.java
new file mode 100644
index 0000000..d1470df
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr8Test.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.solrtest;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class TikaPipesSolr8Test extends TikaPipesSolrTestBase {
+
+    @Rule
+    public GenericContainer<?> solr8 = new 
GenericContainer<>(DockerImageName.parse("solr:8"))
+            .withExposedPorts(8983, 9983)
+            .withCommand("-DzkRun");
+
+    @Before
+    public void setupTest() throws Exception {
+        setupSolr(solr8);
+    }
+
+    @Test
+    public void testFetchIteratorWithSolrUrls() throws Exception {
+        runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(false);
+    }
+
+    @Test
+    public void testFetchIteratorWithZkHost() throws Exception {
+        runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(true);
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolrTestBase.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolrTestBase.java
new file mode 100644
index 0000000..12f3f77
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolrTestBase.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.solrtest;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.tika.cli.TikaCLI;
+import org.apache.tika.pipes.PipeIntegrationTests;
+import org.apache.tika.pipes.emitter.solr.SolrEmitter;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+public abstract class TikaPipesSolrTestBase {
+
+    private final String collection = "testcol";
+    private final int numDocs = 42;
+
+    protected GenericContainer<?> solr;
+
+    private final File testFileFolder = new File("target", "test-files");
+    private String solrHost;
+    private int solrPort;
+    private int zkPort;
+    private String solrEndpoint;
+
+    private void createTestHtmlFiles(String bodyContent) throws Exception {
+        testFileFolder.mkdirs();
+        for (int i = 0; i < numDocs; ++i) {
+            FileUtils.writeStringToFile(new File(testFileFolder, "test-" + i + 
".html"), "<html><body>" + bodyContent + "</body></html>", 
StandardCharsets.UTF_8);
+        }
+    }
+
+    protected void setupSolr(GenericContainer<?> solr) throws Exception {
+        createTestHtmlFiles("initial");
+        this.solr = solr;
+        solrHost = solr.getHost();
+        solrPort = solr.getMappedPort(8983);
+        zkPort = solr.getMappedPort(9983);
+        solrEndpoint = "http://"; + solrHost + ":" + solrPort + "/solr";
+
+        solr.execInContainer("/opt/solr/bin/solr", "create_collection", "-c", 
collection);
+
+        try (SolrClient solrClient = new LBHttpSolrClient.Builder()
+                .withBaseSolrUrls(solrEndpoint).build()) {
+
+            for (int i = 0; i < numDocs; ++i) {
+                SolrInputDocument solrDoc = new SolrInputDocument();
+                String filename = "test-" + i + ".html";
+                solrDoc.setField("id", filename);
+                solrDoc.setField("path", filename);
+                solrClient.add(collection, solrDoc);
+            }
+            solrClient.commit(collection);
+        }
+    }
+
+    /**
+     * Runs a test using Solr Pipe Iterator, File Fetcher and Solr Emitter.
+     * @param useZk If true, use zookeeper to connect to solr. Otherwise use 
direct solr URLs.
+     */
+    protected void runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(boolean 
useZk) throws Exception {
+        File tikaConfigFile = new File("target", "ta.xml");
+        File log4jPropFile = new File("target", "tmp-log4j.properties");
+        try (InputStream is = 
PipeIntegrationTests.class.getResourceAsStream("/tika-async-log4j.properties")) 
{
+            FileUtils.copyInputStreamToFile(is, log4jPropFile);
+        }
+        String tikaConfigTemplateXml;
+        try (InputStream is = 
PipeIntegrationTests.class.getResourceAsStream("/tika-config-solr-urls.xml")) {
+            tikaConfigTemplateXml = IOUtils.toString(is, 
StandardCharsets.UTF_8);
+        }
+
+        String tikaConfigXml = createTikaConfigXml(useZk,
+                tikaConfigFile,
+                log4jPropFile,
+                tikaConfigTemplateXml,
+                SolrEmitter.UpdateStrategy.ADD,
+                SolrEmitter.AttachmentStrategy.CONCATENATE_CONTENT);
+        FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, 
StandardCharsets.UTF_8);
+
+        TikaCLI.main(new String[]{"-a", "--config=" + 
tikaConfigFile.getAbsolutePath()});
+
+        try (SolrClient solrClient = new LBHttpSolrClient.Builder()
+                .withBaseSolrUrls(solrEndpoint).build()) {
+            solrClient.commit(collection);
+            Assert.assertEquals(numDocs, solrClient.query(collection, new 
SolrQuery("mime_s:\"text/html; 
charset=ISO-8859-1\"")).getResults().getNumFound());
+            Assert.assertEquals(numDocs, solrClient.query(collection, new 
SolrQuery("content_s:*initial*")).getResults().getNumFound());
+        }
+
+        // update the documents with "update must exist" and run tika async 
again with "UPDATE_MUST_EXIST". It should not fail, and docs should be updated.
+        createTestHtmlFiles("updated");
+        tikaConfigXml = createTikaConfigXml(useZk,
+                tikaConfigFile,
+                log4jPropFile,
+                tikaConfigTemplateXml,
+                SolrEmitter.UpdateStrategy.UPDATE_MUST_EXIST,
+                SolrEmitter.AttachmentStrategy.CONCATENATE_CONTENT);
+        FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml, 
StandardCharsets.UTF_8);
+
+        TikaCLI.main(new String[]{"-a", "--config=" + 
tikaConfigFile.getAbsolutePath()});
+
+        try (SolrClient solrClient = new LBHttpSolrClient.Builder()
+                .withBaseSolrUrls(solrEndpoint).build()) {
+            solrClient.commit(collection);
+            Assert.assertEquals(numDocs, solrClient.query(collection, new 
SolrQuery("mime_s:\"text/html; 
charset=ISO-8859-1\"")).getResults().getNumFound());
+            Assert.assertEquals(numDocs, solrClient.query(collection, new 
SolrQuery("content_s:*updated*")).getResults().getNumFound());
+        }
+    }
+
+    @NotNull
+    private String createTikaConfigXml(boolean useZk,
+                                       File tikaConfigFile,
+                                       File log4jPropFile,
+                                       String tikaConfigTemplateXml,
+                                       SolrEmitter.UpdateStrategy 
updateStrategy,
+                                       SolrEmitter.AttachmentStrategy 
attachmentStrategy) {
+        String res = tikaConfigTemplateXml.replace("{TIKA_CONFIG}", 
tikaConfigFile.getAbsolutePath())
+                .replace("{UPDATE_STRATEGY}", updateStrategy.toString())
+                .replace("{ATTACHMENT_STRATEGY}", 
attachmentStrategy.toString())
+                .replace("{LOG4J_PROPERTIES_FILE}", 
log4jPropFile.getAbsolutePath())
+                .replace("{PATH_TO_DOCS}", testFileFolder.getAbsolutePath());
+        if (useZk) {
+            res = res.replace("{SOLR_CONNECTION}", "<solrZkHosts>\n" +
+                    "        <solrZkHost>" + solrHost + ":" + zkPort + 
"</solrZkHost>\n" +
+                    "      </solrZkHosts>\n");
+        } else {
+            res = res.replace("{SOLR_CONNECTION}", "<solrUrls>\n" +
+                    "        <solrUrl>http://"; + solrHost + ":" + solrPort + 
"/solr</solrUrl>\n" +
+                    "      </solrUrls>\n");
+        }
+        return res;
+    }
+}
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-async-log4j.properties
 
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-async-log4j.properties
new file mode 100644
index 0000000..c7c6821
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-async-log4j.properties
@@ -0,0 +1,13 @@
+status=debug
+name=PropertiesConfig
+filters=threshold
+filter.threshold.type=ThresholdFilter
+filter.threshold.level=debug
+appenders=console
+appender.console.type=Console
+appender.console.name=STDERR
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n
+rootLogger.level=debug
+rootLogger.appenderRefs=stderr
+rootLogger.appenderRef.stderr.ref=STDERR
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-solr-urls.xml
 
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-solr-urls.xml
new file mode 100644
index 0000000..0fcb1e7
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-solr-urls.xml
@@ -0,0 +1,103 @@
+<properties>
+  <parsers>
+    <parser class="org.apache.tika.parser.DefaultParser">
+      <parser-exclude class="org.apache.tika.parser.ocr.TesseractOCRParser"/>
+      <parser-exclude class="org.apache.tika.parser.pdf.PDFParser"/>
+      <parser-exclude 
class="org.apache.tika.parser.microsoft.ooxml.OOXMLParser"/>
+      <parser-exclude class="org.apache.tika.parser.microsoft.OfficeParser"/>
+    </parser>
+    <parser class="org.apache.tika.parser.pdf.PDFParser">
+      <params>
+        <param name="extractActions" type="bool">true</param>
+        <param name="checkExtractAccessPermissions" type="bool">true</param>
+      </params>
+    </parser>
+    <parser class="org.apache.tika.parser.microsoft.ooxml.OOXMLParser">
+      <params>
+        <param name="includeDeletedContent" type="bool">true</param>
+        <param name="includeMoveFromContent" type="bool">true</param>
+        <param name="extractMacros" type="bool">true</param>
+      </params>
+    </parser>
+    <parser class="org.apache.tika.parser.microsoft.OfficeParser">
+      <params>
+        <param name="extractMacros" type="bool">true</param>
+      </params>
+    </parser>
+  </parsers>
+  <metadataFilters>
+    <metadataFilter 
class="org.apache.tika.metadata.filter.FieldNameMappingFilter">
+      <params>
+        <excludeUnmapped>true</excludeUnmapped>
+        <mappings>
+          <mapping from="X-TIKA:content" to="content_s"/>
+          <mapping from="Content-Length" to="length_i"/>
+          <mapping from="dc:creator" to="creators_ss"/>
+          <mapping from="dc:title" to="title_s"/>
+          <mapping from="Content-Type" to="mime_s"/>
+          <mapping from="X-TIKA:EXCEPTION:container_exception" 
to="tika_exception_s"/>
+        </mappings>
+      </params>
+    </metadataFilter>
+  </metadataFilters>
+  <async>
+    <params>
+      <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+      <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+      <emitWithinMillis>60000</emitWithinMillis>
+      <numEmitters>1</numEmitters>
+      <numClients>1</numClients>
+      <tikaConfig>{TIKA_CONFIG}</tikaConfig>
+      <forkedJvmArgs>
+        <arg>-Xmx1g</arg>
+        <arg>-XX:ParallelGCThreads=2</arg>
+        <arg>-XX:+ExitOnOutOfMemoryError</arg>
+        <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
+      </forkedJvmArgs>
+      <timeoutMillis>60000</timeoutMillis>
+    </params>
+  </async>
+  <fetchers>
+    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+      <params>
+        <name>fsf</name>
+        <basePath>{PATH_TO_DOCS}</basePath>
+      </params>
+    </fetcher>
+  </fetchers>
+  <emitters>
+    <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
+      <params>
+        <name>se</name>
+        {SOLR_CONNECTION}
+        <updateStrategy>{UPDATE_STRATEGY}</updateStrategy>
+        <solrCollection>testcol</solrCollection>
+        <attachmentStrategy>{ATTACHMENT_STRATEGY}</attachmentStrategy>
+        <contentField>content</contentField>
+        <commitWithin>10</commitWithin>
+        <idField>id</idField>
+        <connectionTimeout>10000</connectionTimeout>
+        <socketTimeout>60000</socketTimeout>
+      </params>
+    </emitter>
+    <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
+      <params>
+        <name>fse</name>
+        <basePath>/path/to/extracts</basePath>
+      </params>
+    </emitter>
+  </emitters>
+  <pipesIterator class="org.apache.tika.pipes.solrtest.SolrPipesIterator">
+    <params>
+      <solrCollection>testcol</solrCollection>
+      {SOLR_CONNECTION}
+      <idField>id</idField>
+      <parsingIdField>parsing_id_i</parsingIdField>
+      <failCountField>fail_count_i</failCountField>
+      <sizeFieldName>size_i</sizeFieldName>
+      <rows>10</rows>
+      <fetcherName>fsf</fetcherName>
+      <emitterName>se</emitterName>
+    </params>
+  </pipesIterator>
+</properties>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-iterators/pom.xml 
b/tika-pipes/tika-pipes-iterators/pom.xml
index 2600ff6..c929428 100644
--- a/tika-pipes/tika-pipes-iterators/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/pom.xml
@@ -39,5 +39,6 @@
     <module>tika-pipes-iterator-csv</module>
     <module>tika-pipes-iterator-jdbc</module>
     <module>tika-pipes-iterator-s3</module>
+    <module>tika-pipes-iterator-solr</module>
   </modules>
 </project>
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/pom.xml
similarity index 71%
copy from tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
copy to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/pom.xml
index 074152e..cb80a34 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/pom.xml
@@ -21,13 +21,17 @@
          xmlns="http://maven.apache.org/POM/4.0.0";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <parent>
-    <artifactId>tika-emitters</artifactId>
     <groupId>org.apache.tika</groupId>
+    <artifactId>tika-pipes-iterators</artifactId>
     <version>2.0.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
-  <artifactId>tika-emitter-solr</artifactId>
+  <artifactId>tika-pipes-iterator-solr</artifactId>
+
+  <name>Apache Tika Fetch Iterator - Solr</name>
+  <url>https://tika.apache.org/</url>
 
   <dependencies>
     <dependency>
@@ -37,18 +41,57 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>tika-httpclient-commons</artifactId>
-      <version>${project.version}</version>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-s3</artifactId>
+      <version>${aws.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-codec</groupId>
+          <artifactId>commons-codec</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>commons-codec</groupId>
+      <artifactId>commons-codec</artifactId>
+      <version>${commons.codec.version}</version>
     </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
+      <artifactId>jackson-databind</artifactId>
       <version>${jackson.version}</version>
     </dependency>
     <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <version>${commons.logging.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
+      <version>${log4j2.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.solr</groupId>
+      <artifactId>solr-solrj</artifactId>
+      <version>${solrj.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-httpclient-commons</artifactId>
+      <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>junit</groupId>
@@ -56,7 +99,6 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
-
   <build>
     <plugins>
       <plugin>
@@ -65,7 +107,7 @@
         <configuration>
           <archive>
             <manifestEntries>
-              
<Automatic-Module-Name>org.apache.tika.pipes.emitter.solr</Automatic-Module-Name>
+              
<Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.solr</Automatic-Module-Name>
             </manifestEntries>
           </archive>
         </configuration>
@@ -125,4 +167,4 @@
 
     </plugins>
   </build>
-</project>
\ No newline at end of file
+</project>
diff --git 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java
new file mode 100644
index 0000000..78d6a49
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.solrtest;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.params.CursorMarkParams;
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Iterates through results from a Solr query.
+ */
+public class SolrPipesIterator extends PipesIterator implements Initializable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SolrPipesIterator.class);
+
+    private String solrCollection;
+    /**
+     * You can specify solrUrls, or you can specify solrZkHosts and use use 
zookeeper to determine the solr server urls.
+     */
+    private List<String> solrUrls = Collections.emptyList();
+    private List<String> solrZkHosts = Collections.emptyList();
+    private String solrZkChroot;
+    private List<String> filters = Collections.emptyList();
+    private String idField;
+    private String parsingIdField;
+    private String failCountField;
+    private String sizeFieldName;
+    private List<String> additionalFields = Collections.emptyList();
+    private int rows = 5000;
+    private int connectionTimeout = 10000;
+    private int socketTimeout = 60000;
+
+    private final HttpClientFactory httpClientFactory;
+
+    public SolrPipesIterator() throws TikaConfigException {
+        httpClientFactory = new HttpClientFactory();
+    }
+
+    @Field
+    public void setSolrZkHosts(List<String> solrZkHosts) {
+        this.solrZkHosts = solrZkHosts;
+    }
+
+    @Field
+    public void setSolrZkChroot(String solrZkChroot) {
+        this.solrZkChroot = solrZkChroot;
+    }
+
+    @Field
+    public void setSolrCollection(String solrCollection) {
+        this.solrCollection = solrCollection;
+    }
+
+    @Field
+    public void setSolrUrls(List<String> solrUrls) {
+        this.solrUrls = solrUrls;
+    }
+
+    @Field
+    public void setFilters(List<String> filters) {
+        this.filters = filters;
+    }
+
+    @Field
+    public void setAdditionalFields(List<String> additionalFields) {
+        this.additionalFields = additionalFields;
+    }
+
+    @Field
+    public void setIdField(String idField) {
+        this.idField = idField;
+    }
+
+    @Field
+    public void setParsingIdField(String parsingIdField) {
+        this.parsingIdField = parsingIdField;
+    }
+
+    @Field
+    public void setFailCountField(String failCountField) {
+        this.failCountField = failCountField;
+    }
+
+    @Field
+    public void setSizeFieldName(String sizeFieldName) {
+        this.sizeFieldName = sizeFieldName;
+    }
+
+    @Field
+    public void setRows(int rows) {
+        this.rows = rows;
+    }
+
+    @Field
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    @Field
+    public void setSocketTimeout(int socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    //TODO -- add other httpclient configurations??
+    @Field
+    public void setUserName(String userName) {
+        httpClientFactory.setUserName(userName);
+    }
+
+    @Field
+    public void setPassword(String password) {
+        httpClientFactory.setPassword(password);
+    }
+
+    @Field
+    public void setAuthScheme(String authScheme) {
+        httpClientFactory.setAuthScheme(authScheme);
+    }
+
+    @Field
+    public void setProxyHost(String proxyHost) {
+        httpClientFactory.setProxyHost(proxyHost);
+    }
+
+    @Field
+    public void setProxyPort(int proxyPort) {
+        httpClientFactory.setProxyPort(proxyPort);
+    }
+
+    @Override
+    protected void enqueue() throws InterruptedException, IOException, 
TimeoutException {
+        String fetcherName = getFetcherName();
+        String emitterName = getEmitterName();
+
+        try (SolrClient solrClient = createSolrClient()) {
+            int fileCount = 0;
+
+            SolrQuery query = new SolrQuery();
+            query.set("q", "*:*");
+            query.setRows(rows);
+
+            Set<String> allFields = new HashSet<>();
+            allFields.add("id");
+            allFields.add(idField);
+            allFields.add(parsingIdField);
+            allFields.add(failCountField);
+            allFields.add(sizeFieldName);
+            allFields.addAll(additionalFields);
+
+            query.setFields(allFields.toArray(new String[]{}));
+            query.setSort(SolrQuery.SortClause.asc(parsingIdField));
+            query.addSort(SolrQuery.SortClause.asc("id"));
+            query.setFilterQueries(filters.toArray(new String[]{}));
+
+            HandlerConfig handlerConfig = getHandlerConfig();
+
+            String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+            boolean done = false;
+            while (!done) {
+                query.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+                QueryResponse qr = solrClient.query(solrCollection, query);
+                long totalToFetch = qr.getResults().getNumFound();
+                String nextCursorMark = qr.getNextCursorMark();
+                LOGGER.info("Query to fetch files to parse collection={}, 
q={}, onCount={}, totalCount={}", solrCollection, query, fileCount, 
totalToFetch);
+                for (SolrDocument sd : qr.getResults()) {
+                    ++fileCount;
+                    String fetchKey = (String) sd.getFieldValue(idField);
+                    String emitKey = (String) sd.getFieldValue(idField);
+                    Metadata metadata = new Metadata();
+                    for (String nextField : allFields) {
+                        metadata.add(nextField, (String) 
sd.getFieldValue(nextField));
+                    }
+                    LOGGER.info("iterator doc: {}, idField={}, fetchKey={}", 
sd, idField, fetchKey);
+                    tryToAdd(new FetchEmitTuple(fetchKey,
+                            new FetchKey(fetcherName, fetchKey),
+                            new EmitKey(emitterName, emitKey),
+                            new Metadata(),
+                            handlerConfig,
+                            getOnParseException()));
+                }
+                if (cursorMark.equals(nextCursorMark)) {
+                    done = true;
+                }
+                cursorMark = nextCursorMark;
+            }
+        } catch (SolrServerException | TikaConfigException e) {
+            LOGGER.error("Could not iterate through solr", e);
+        }
+    }
+
+    private SolrClient createSolrClient() throws TikaConfigException {
+        if (solrUrls == null || solrUrls.isEmpty()) {
+            return new CloudSolrClient.Builder(solrZkHosts, 
Optional.ofNullable(solrZkChroot))
+                    .withHttpClient(httpClientFactory.build())
+                    .withConnectionTimeout(connectionTimeout)
+                    .withSocketTimeout(socketTimeout)
+                    .build();
+        }
+        return new LBHttpSolrClient.Builder()
+                .withConnectionTimeout(connectionTimeout)
+                .withSocketTimeout(socketTimeout)
+                .withHttpClient(httpClientFactory.build())
+                .withBaseSolrUrls(solrUrls.toArray(new String[]{})).build();
+    }
+
+    @Override
+    public void checkInitialization(InitializableProblemHandler problemHandler)
+            throws TikaConfigException {
+        super.checkInitialization(problemHandler);
+        mustNotBeEmpty("solrCollection", this.solrCollection);
+        mustNotBeEmpty("urlFieldName", this.idField);
+        mustNotBeEmpty("parsingIdField", this.parsingIdField);
+        mustNotBeEmpty("failCountField", this.failCountField);
+        mustNotBeEmpty("sizeFieldName", this.sizeFieldName);
+        if ((this.solrUrls == null || this.solrUrls.isEmpty()) && 
(this.solrZkHosts == null || this.solrZkHosts.isEmpty())) {
+            throw new IllegalArgumentException("expected either param solrUrls 
or param solrZkHosts, but neither was specified");
+        }
+        if (this.solrUrls != null && !this.solrUrls.isEmpty() && 
this.solrZkHosts != null && !this.solrZkHosts.isEmpty()) {
+            throw new IllegalArgumentException("expected either param solrUrls 
or param solrZkHosts, but both were specified");
+        }
+    }
+}

Reply via email to