This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 134bbf8 Add more solr support to tika pipes (#445)
134bbf8 is described below
commit 134bbf876dcf39eb50fb6a456893d0e2cb228c17
Author: Nicholas DiPiazza <[email protected]>
AuthorDate: Mon May 17 11:37:07 2021 -0500
Add more solr support to tika pipes (#445)
* Add solr pipe iterator
Add solr emitter
Add end-to-end tika async test with solr pipe iterator, file system
fetcher, and solr emitter.
* fix styles
* resolve some enforcer findings
---
.../java/org/apache/tika/pipes/PipesClient.java | 2 +
tika-parent/pom.xml | 2 +
tika-pipes/tika-emitters/tika-emitter-solr/pom.xml | 35 ++-
.../tika/pipes/emitter/solr/SolrEmitter.java | 325 +++++++++++----------
.../apache/tika/pipes/emitter/solr/TestBasic.java | 94 ------
tika-pipes/tika-pipes-integration-tests/pom.xml | 30 ++
.../tika/pipes/solrtest/TikaPipesSolr6Test.java | 46 +++
.../tika/pipes/solrtest/TikaPipesSolr7Test.java | 47 +++
.../tika/pipes/solrtest/TikaPipesSolr8Test.java | 46 +++
.../tika/pipes/solrtest/TikaPipesSolrTestBase.java | 155 ++++++++++
.../src/test/resources/tika-async-log4j.properties | 13 +
.../src/test/resources/tika-config-solr-urls.xml | 103 +++++++
tika-pipes/tika-pipes-iterators/pom.xml | 1 +
.../tika-pipes-iterator-solr}/pom.xml | 60 +++-
.../tika/pipes/solrtest/SolrPipesIterator.java | 264 +++++++++++++++++
15 files changed, 970 insertions(+), 253 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index ac46270..2db98b2 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -342,6 +342,8 @@ public class PipesClient implements Closeable {
SERVER_LOG.warn(line.substring(5));
} else if (line.startsWith("error ")) {
SERVER_LOG.error(line.substring(6));
+ } else {
+ SERVER_LOG.error(line);
}
try {
line = reader.readLine();
diff --git a/tika-parent/pom.xml b/tika-parent/pom.xml
index 66b5b99..4c5e38c 100644
--- a/tika-parent/pom.xml
+++ b/tika-parent/pom.xml
@@ -347,9 +347,11 @@
<sis.version>1.0</sis.version>
<!-- we'll need to stay on 1.7 until we're java modularized ? -->
<slf4j.version>1.7.30</slf4j.version>
+ <solrj.version>8.8.2</solrj.version>
<spring.version>5.3.3</spring.version>
<sqlite.version>3.34.0</sqlite.version>
<tagsoup.version>1.2.1</tagsoup.version>
+ <test.containers.version>1.15.2</test.containers.version>
<!-- NOTE: sync tukaani version with commons-compress in tika-parent-->
<tukaani.version>1.8</tukaani.version>
<twelvemonkeys.version>3.6.1</twelvemonkeys.version>
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
index 074152e..6d0d485 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
+++ b/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
@@ -51,6 +51,35 @@
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ <version>${jetty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ <version>${solrj.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
@@ -103,17 +132,17 @@
</filters>
<transformers>
<transformer
-
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/LICENSE</resource>
<file>target/classes/META-INF/LICENSE</file>
</transformer>
<transformer
-
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/NOTICE</resource>
<file>target/classes/META-INF/NOTICE</file>
</transformer>
<transformer
-
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
+
implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>META-INF/DEPENDENCIES</resource>
<file>target/classes/META-INF/DEPENDENCIES</file>
</transformer>
diff --git
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
index 0d9cd35..78eca0d 100644
---
a/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-solr/src/main/java/org/apache/tika/pipes/emitter/solr/SolrEmitter.java
@@ -16,26 +16,22 @@
*/
package org.apache.tika.pipes.emitter.solr;
-import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
-import java.util.zip.GZIPOutputStream;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.http.client.HttpClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
import org.apache.tika.client.HttpClientFactory;
-import org.apache.tika.client.HttpClientUtil;
-import org.apache.tika.client.TikaClientException;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
@@ -45,87 +41,72 @@ import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.TikaEmitterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SolrEmitter extends AbstractEmitter implements Initializable {
- private static final String ATTACHMENTS = "attachments";
- private static final String UPDATE_PATH = "/update";
+ public enum AttachmentStrategy {
+ SKIP,
+ CONCATENATE_CONTENT,
+ PARENT_CHILD,
+ //anything else?
+ }
+
+ public enum UpdateStrategy {
+ ADD,
+ UPDATE_MUST_EXIST,
+ UPDATE_MUST_NOT_EXIST,
+ }
+
private static final Logger LOG =
LoggerFactory.getLogger(SolrEmitter.class);
- //one day this will be allowed or can be configured?
- private final boolean gzipJson = false;
+
private AttachmentStrategy attachmentStrategy =
AttachmentStrategy.PARENT_CHILD;
- private String url;
+ private UpdateStrategy updateStrategy = UpdateStrategy.ADD;
+ private String solrCollection;
+ /**
+ * You can specify solrUrls, or you can specify solrZkHosts and use use
zookeeper to determine the solr server urls.
+ */
+ private List<String> solrUrls;
+ private List<String> solrZkHosts;
+ private String solrZkChroot;
private String contentField = "content";
private String idField = "id";
- private int commitWithin = 100;
- private HttpClientFactory httpClientFactory;
- private HttpClient httpClient;
+ private int commitWithin = 1000;
+ private int connectionTimeout = 10000;
+ private int socketTimeout = 60000;
+ private final HttpClientFactory httpClientFactory;
+ private SolrClient solrClient;
+
public SolrEmitter() throws TikaConfigException {
httpClientFactory = new HttpClientFactory();
}
@Override
- public void emit(String emitKey, List<Metadata> metadataList)
- throws IOException, TikaEmitterException {
-
+ public void emit(String emitKey, List<Metadata> metadataList) throws
IOException,
+ TikaEmitterException {
if (metadataList == null || metadataList.size() == 0) {
LOG.warn("metadataList is null or empty");
return;
}
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Writer writer = gzipJson ? new BufferedWriter(
- new OutputStreamWriter(new GZIPOutputStream(bos),
StandardCharsets.UTF_8)) :
- new BufferedWriter(new OutputStreamWriter(bos,
StandardCharsets.UTF_8));
- try (JsonGenerator jsonGenerator = new
JsonFactory().createGenerator(writer)) {
- jsonGenerator.writeStartArray();
- jsonify(jsonGenerator, emitKey, metadataList);
- jsonGenerator.writeEndArray();
- }
- LOG.debug("emitting json ({})", new String(bos.toByteArray(),
StandardCharsets.UTF_8));
- try {
- HttpClientUtil
- .postJson(httpClient, url + UPDATE_PATH +
- "?commitWithin=" + getCommitWithin(),
- bos.toByteArray(), gzipJson);
- } catch (TikaClientException e) {
- throw new TikaEmitterException("can't post", e);
- }
+ List<SolrInputDocument> docsToUpdate = new ArrayList<>();
+ addMetadataAsSolrInputDocuments(emitKey, metadataList, docsToUpdate);
+ emitSolrBatch(docsToUpdate);
}
- @Override
- public void emit(List<? extends EmitData> batch) throws IOException,
TikaEmitterException {
- if (batch == null || batch.size() == 0) {
- LOG.warn("batch is null or empty");
- return;
- }
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Writer writer = gzipJson ? new BufferedWriter(
- new OutputStreamWriter(new GZIPOutputStream(bos),
StandardCharsets.UTF_8)) :
- new BufferedWriter(new OutputStreamWriter(bos,
StandardCharsets.UTF_8));
- try (JsonGenerator jsonGenerator = new
JsonFactory().createGenerator(writer)) {
- jsonGenerator.writeStartArray();
- for (EmitData d : batch) {
- jsonify(jsonGenerator, d.getEmitKey().getEmitKey(),
d.getMetadataList());
- }
- jsonGenerator.writeEndArray();
- }
- LOG.debug("emitting json ({})", new String(bos.toByteArray(),
StandardCharsets.UTF_8));
- try {
- HttpClientUtil
- .postJson(httpClient, url + UPDATE_PATH +
- "?commitWithin=" + getCommitWithin(),
- bos.toByteArray(), gzipJson);
- } catch (TikaClientException e) {
- throw new TikaEmitterException("can't post", e);
+ private void addMetadataAsSolrInputDocuments(String emitKey,
List<Metadata> metadataList,
+ List<SolrInputDocument>
docsToUpdate)
+ throws IOException, TikaEmitterException {
+ SolrInputDocument solrInputDocument = new SolrInputDocument();
+ solrInputDocument.setField(idField, emitKey);
+ if (updateStrategy == UpdateStrategy.UPDATE_MUST_EXIST) {
+ solrInputDocument.setField("_version_", 1);
+ } else if (updateStrategy == UpdateStrategy.UPDATE_MUST_NOT_EXIST) {
+ solrInputDocument.setField("_version_", -1);
}
- }
-
- private void jsonify(JsonGenerator jsonGenerator, String emitKey,
- List<Metadata> metadataList)
- throws IOException {
- metadataList.get(0).set(idField, emitKey);
- if (attachmentStrategy == AttachmentStrategy.SKIP ||
metadataList.size() == 1) {
- jsonify(metadataList.get(0), jsonGenerator);
+ if (attachmentStrategy == AttachmentStrategy.SKIP ||
+ metadataList.size() == 1) {
+ addMetadataToSolrInputDocument(metadataList.get(0),
solrInputDocument, updateStrategy);
} else if (attachmentStrategy ==
AttachmentStrategy.CONCATENATE_CONTENT) {
//this only handles text for now, not xhtml
StringBuilder sb = new StringBuilder();
@@ -137,95 +118,106 @@ public class SolrEmitter extends AbstractEmitter
implements Initializable {
}
Metadata parent = metadataList.get(0);
parent.set(getContentField(), sb.toString());
- jsonify(parent, jsonGenerator);
+ addMetadataToSolrInputDocument(parent, solrInputDocument,
updateStrategy);
} else if (attachmentStrategy == AttachmentStrategy.PARENT_CHILD) {
- jsonify(metadataList.get(0), jsonGenerator, false);
- jsonGenerator.writeArrayFieldStart(ATTACHMENTS);
-
+ addMetadataToSolrInputDocument(metadataList.get(0),
solrInputDocument, updateStrategy);
for (int i = 1; i < metadataList.size(); i++) {
+ SolrInputDocument childSolrInputDocument = new
SolrInputDocument();
Metadata m = metadataList.get(i);
- m.set(idField, UUID.randomUUID().toString());
- jsonify(m, jsonGenerator);
+ childSolrInputDocument.setField(idField,
UUID.randomUUID().toString());
+ addMetadataToSolrInputDocument(m, childSolrInputDocument,
updateStrategy);
}
- jsonGenerator.writeEndArray();
- jsonGenerator.writeEndObject();
} else {
- throw new IllegalArgumentException(
- "I don't yet support this attachment strategy: " +
attachmentStrategy);
+ throw new IllegalArgumentException("I don't yet support this
attachment strategy: "
+ + attachmentStrategy);
}
+ docsToUpdate.add(solrInputDocument);
}
- private void jsonify(Metadata metadata, JsonGenerator jsonGenerator,
boolean writeEndObject)
- throws IOException {
- jsonGenerator.writeStartObject();
+ @Override
+ public void emit(List<? extends EmitData> batch) throws IOException,
TikaEmitterException {
+ if (batch == null || batch.size() == 0) {
+ LOG.warn("batch is null or empty");
+ return;
+ }
+ List<SolrInputDocument> docsToUpdate = new ArrayList<>();
+ for (EmitData d : batch) {
+ addMetadataAsSolrInputDocuments(d.getEmitKey().getEmitKey(),
d.getMetadataList(), docsToUpdate);
+ }
+ emitSolrBatch(docsToUpdate);
+ }
+
+ private void emitSolrBatch(List<SolrInputDocument> docsToUpdate) throws
IOException, TikaEmitterException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Emitting solr doc batch: {}", docsToUpdate);
+ }
+ if (!docsToUpdate.isEmpty()) {
+ try {
+ UpdateRequest req = new UpdateRequest();
+ req.add(docsToUpdate);
+ req.setCommitWithin(commitWithin);
+ req.setParam("failOnVersionConflicts", "false");
+ req.process(solrClient, solrCollection);
+ } catch (Exception e) {
+ throw new TikaEmitterException("Could not add batch to solr",
e);
+ }
+ }
+ }
+
+ private void addMetadataToSolrInputDocument(Metadata metadata,
SolrInputDocument solrInputDocument,
+ UpdateStrategy updateStrategy)
{
for (String n : metadata.names()) {
String[] vals = metadata.getValues(n);
if (vals.length == 0) {
continue;
} else if (vals.length == 1) {
- jsonGenerator.writeStringField(n, vals[0]);
+ if (updateStrategy == UpdateStrategy.ADD) {
+ solrInputDocument.setField(n, vals[0]);
+ } else {
+ solrInputDocument.setField(n, new HashMap<String,
String>() {{
+ put("set", vals[0]);
+ }});
+ }
} else if (vals.length > 1) {
- jsonGenerator.writeArrayFieldStart(n);
- for (String val : vals) {
- jsonGenerator.writeString(val);
+ if (updateStrategy == UpdateStrategy.ADD) {
+ solrInputDocument.setField(n, vals);
+ } else {
+ solrInputDocument.setField(n, new HashMap<String,
String[]>() {{
+ put("set", vals);
+ }});
}
- jsonGenerator.writeEndArray();
}
}
- if (writeEndObject) {
- jsonGenerator.writeEndObject();
- }
- }
-
- private void jsonify(Metadata metadata, JsonGenerator jsonGenerator)
throws IOException {
- jsonify(metadata, jsonGenerator, true);
}
/**
- * Options: "skip", "concatenate-content", "parent-child". Default is
"parent-child".
- * If set to "skip", this will index only the main file and ignore all info
- * in the attachments. If set to "concatenate", this will concatenate the
+ * Options: SKIP, CONCATENATE_CONTENT, PARENT_CHILD. Default is
"PARENT_CHILD".
+ * If set to "SKIP", this will index only the main file and ignore all info
+ * in the attachments. If set to "CONCATENATE_CONTENT", this will
concatenate the
* content extracted from the attachments into the main document and
* then index the main document with the concatenated content _and_ the
* main document's metadata (metadata from attachments will be thrown
away).
- * If set to "parent-child", this will index the attachments as children
+ * If set to "PARENT_CHILD", this will index the attachments as children
* of the parent document via Solr's parent-child relationship.
- *
- * @param attachmentStrategy
*/
@Field
public void setAttachmentStrategy(String attachmentStrategy) {
- switch (attachmentStrategy) {
- case "skip":
- this.attachmentStrategy = AttachmentStrategy.SKIP;
- break;
- case "concatenate-content":
- this.attachmentStrategy =
AttachmentStrategy.CONCATENATE_CONTENT;
- break;
- case "parent-child":
- this.attachmentStrategy = AttachmentStrategy.PARENT_CHILD;
- break;
- default:
- throw new IllegalArgumentException("Expected 'skip',
'concatenate-content' or " +
- "'parent-child'. I regret I do not recognize: " +
attachmentStrategy);
- }
+ this.attachmentStrategy =
AttachmentStrategy.valueOf(attachmentStrategy);
}
- /**
- * Specify the url for Solr
- *
- * @param url
- */
@Field
- public void setUrl(String url) {
- if (url.endsWith("/")) {
- url = url.substring(0, url.length() - 1);
- }
- this.url = url;
+ public void setUpdateStrategy(String updateStrategy) {
+ this.updateStrategy = UpdateStrategy.valueOf(updateStrategy);
}
- public String getContentField() {
- return contentField;
+ @Field
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ @Field
+ public void setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
}
/**
@@ -242,8 +234,8 @@ public class SolrEmitter extends AbstractEmitter implements
Initializable {
this.contentField = contentField;
}
- public int getCommitWithin() {
- return commitWithin;
+ public String getContentField() {
+ return contentField;
}
@Field
@@ -251,6 +243,10 @@ public class SolrEmitter extends AbstractEmitter
implements Initializable {
this.commitWithin = commitWithin;
}
+ public int getCommitWithin() {
+ return commitWithin;
+ }
+
/**
* Specify the field in the first Metadata that should be
* used as the id field for the document.
@@ -262,7 +258,27 @@ public class SolrEmitter extends AbstractEmitter
implements Initializable {
this.idField = idField;
}
- //TODO -- add other httpclient configurations
+ @Field
+ public void setSolrCollection(String solrCollection) {
+ this.solrCollection = solrCollection;
+ }
+
+ @Field
+ public void setSolrUrls(List<String> solrUrls) {
+ this.solrUrls = solrUrls;
+ }
+
+ @Field
+ public void setSolrZkHosts(List<String> solrZkHosts) {
+ this.solrZkHosts = solrZkHosts;
+ }
+
+ @Field
+ public void setSolrZkChroot(String solrZkChroot) {
+ this.solrZkChroot = solrZkChroot;
+ }
+
+ //TODO -- add other httpclient configurations??
@Field
public void setUserName(String userName) {
httpClientFactory.setUserName(userName);
@@ -290,19 +306,34 @@ public class SolrEmitter extends AbstractEmitter
implements Initializable {
@Override
public void initialize(Map<String, Param> params) throws
TikaConfigException {
- //TODO: build the client here?
- httpClient = httpClientFactory.build();
+ if (solrUrls == null || solrUrls.isEmpty()) {
+ solrClient = new CloudSolrClient.Builder(solrZkHosts,
Optional.ofNullable(solrZkChroot))
+ .withConnectionTimeout(connectionTimeout)
+ .withSocketTimeout(socketTimeout)
+ .withHttpClient(httpClientFactory.build())
+ .build();
+ } else {
+ solrClient = new LBHttpSolrClient.Builder()
+ .withConnectionTimeout(connectionTimeout)
+ .withSocketTimeout(socketTimeout)
+ .withHttpClient(httpClientFactory.build())
+ .withBaseSolrUrls(solrUrls.toArray(new String[]
{})).build();
+ }
}
@Override
- public void checkInitialization(InitializableProblemHandler problemHandler)
- throws TikaConfigException {
-
- }
-
- enum AttachmentStrategy {
- SKIP, CONCATENATE_CONTENT, PARENT_CHILD,
- //anything else?
+ public void checkInitialization(InitializableProblemHandler
problemHandler) throws TikaConfigException {
+ mustNotBeEmpty("solrCollection", this.solrCollection);
+ mustNotBeEmpty("urlFieldName", this.idField);
+ if ((this.solrUrls == null || this.solrUrls.isEmpty()) &&
+ (this.solrZkHosts == null || this.solrZkHosts.isEmpty())) {
+ throw new IllegalArgumentException(
+ "expected either param solrUrls or param solrZkHosts, but
neither was specified");
+ }
+ if (this.solrUrls != null && !this.solrUrls.isEmpty() &&
this.solrZkHosts != null &&
+ !this.solrZkHosts.isEmpty()) {
+ throw new IllegalArgumentException(
+ "expected either param solrUrls or param solrZkHosts, but
both were specified");
+ }
}
-
}
diff --git
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
b/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
deleted file mode 100644
index d88947e..0000000
---
a/tika-pipes/tika-emitters/tika-emitter-solr/src/test/java/org/apache/tika/pipes/emitter/solr/TestBasic.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.emitter.solr;
-
-
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.tika.config.TikaConfig;
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.metadata.filter.MetadataFilter;
-import org.apache.tika.pipes.emitter.EmitData;
-import org.apache.tika.pipes.emitter.EmitKey;
-import org.apache.tika.pipes.emitter.Emitter;
-import org.apache.tika.pipes.emitter.EmitterManager;
-
-@Ignore("requires solr to be up and running; please dockerize some tests,
please, please")
-public class TestBasic {
-
- @Test
- public void testBasic() throws Exception {
- TikaConfig tikaConfig = new TikaConfig(
-
TestBasic.class.getResourceAsStream("/tika-config-simple-emitter.xml"));
- EmitterManager emitterManager = EmitterManager.load(
-
Paths.get(TestBasic.class.getResource("/tika-config-simple-emitter.xml").toURI())
- );
- Emitter emitter = emitterManager.getEmitter("solr1");
- List<Metadata> metadataList = getParentChild(tikaConfig, "id1", 2);
-
- emitter.emit("1", metadataList);
- }
-
- @Test
- public void testBatch() throws Exception {
- TikaConfig tikaConfig = new TikaConfig(
-
TestBasic.class.getResourceAsStream("/tika-config-simple-emitter.xml"));
- EmitterManager emitterManager = EmitterManager.load(
-
Paths.get(TestBasic.class.getResource("/tika-config-simple-emitter.xml").toURI())
- );
- Emitter emitter = emitterManager.getEmitter("solr2");
- List<EmitData> emitData = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- List<Metadata> metadataList = getParentChild(tikaConfig, "batch_"
+ i, 4);
- emitData.add(new EmitData(new EmitKey(emitter.getName(),
- "batch_" + i), metadataList));
- }
- emitter.emit(emitData);
- }
-
- private List<Metadata> getParentChild(TikaConfig tikaConfig, String id,
int numChildren)
- throws TikaException {
- List<Metadata> metadataList = new ArrayList<>();
- MetadataFilter filter = tikaConfig.getMetadataFilter();
-
- Metadata m1 = new Metadata();
- m1.set("id", id);
- m1.set(Metadata.CONTENT_LENGTH, "314159");
- m1.set(TikaCoreProperties.TIKA_CONTENT, "the quick brown");
- m1.set(TikaCoreProperties.TITLE, "this is the first title");
- m1.add(TikaCoreProperties.CREATOR, "firstAuthor");
- m1.add(TikaCoreProperties.CREATOR, "secondAuthor");
- filter.filter(m1);
- metadataList.add(m1);
- for (int i = 1; i < numChildren; i++) {
- Metadata m2 = new Metadata();
- m2.set(TikaCoreProperties.EMBEDDED_RESOURCE_PATH,
"/path_to_this.txt");
- m2.set(TikaCoreProperties.TIKA_CONTENT, "fox jumped over the lazy
" + i);
- filter.filter(m2);
- metadataList.add(m2);
- }
- return metadataList;
- }
-
-}
diff --git a/tika-pipes/tika-pipes-integration-tests/pom.xml
b/tika-pipes/tika-pipes-integration-tests/pom.xml
index fbaef59..b4c4036 100644
--- a/tika-pipes/tika-pipes-integration-tests/pom.xml
+++ b/tika-pipes/tika-pipes-integration-tests/pom.xml
@@ -71,6 +71,36 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${test.containers.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-pipes-iterator-solr</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-emitter-solr</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-app</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ <version>${solrj.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr6Test.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr6Test.java
new file mode 100644
index 0000000..12e9ac7
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr6Test.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.solrtest;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class TikaPipesSolr6Test extends TikaPipesSolrTestBase {
+
+ @Rule
+ public GenericContainer<?> solr6 = new
GenericContainer<>(DockerImageName.parse("solr:6"))
+ .withExposedPorts(8983, 9983)
+ .withCommand("-DzkRun");
+
+ @Before
+ public void setupTest() throws Exception {
+ setupSolr(solr6);
+ }
+
+ @Test
+ public void testFetchIteratorWithSolrUrls() throws Exception {
+ runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(false);
+ }
+
+ @Test
+ public void testFetchIteratorWithZkHost() throws Exception {
+ runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(true);
+ }
+}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr7Test.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr7Test.java
new file mode 100644
index 0000000..c9cf566
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr7Test.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.solrtest;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class TikaPipesSolr7Test extends TikaPipesSolrTestBase {
+
+ @Rule
+ public GenericContainer<?> solr7 = new
GenericContainer<>(DockerImageName.parse("solr:7"))
+ .withExposedPorts(8983, 9983)
+ .withCommand("-DzkRun");
+
+ @Before
+ public void setupTest() throws Exception {
+ setupSolr(solr7);
+ }
+
+ @Test
+ public void testFetchIteratorWithSolrUrls() throws Exception {
+ runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(false);
+
+ }
+
+ @Test
+ public void testFetchIteratorWithZkHost() throws Exception {
+ runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(true);
+ }
+}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr8Test.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr8Test.java
new file mode 100644
index 0000000..d1470df
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolr8Test.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.solrtest;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class TikaPipesSolr8Test extends TikaPipesSolrTestBase {
+
+ @Rule
+ public GenericContainer<?> solr8 = new
GenericContainer<>(DockerImageName.parse("solr:8"))
+ .withExposedPorts(8983, 9983)
+ .withCommand("-DzkRun");
+
+ @Before
+ public void setupTest() throws Exception {
+ setupSolr(solr8);
+ }
+
+ @Test
+ public void testFetchIteratorWithSolrUrls() throws Exception {
+ runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(false);
+ }
+
+ @Test
+ public void testFetchIteratorWithZkHost() throws Exception {
+ runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(true);
+ }
+}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolrTestBase.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolrTestBase.java
new file mode 100644
index 0000000..12f3f77
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/solrtest/TikaPipesSolrTestBase.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.solrtest;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.tika.cli.TikaCLI;
+import org.apache.tika.pipes.PipeIntegrationTests;
+import org.apache.tika.pipes.emitter.solr.SolrEmitter;
+import org.jetbrains.annotations.NotNull;
+import org.junit.Assert;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+
+public abstract class TikaPipesSolrTestBase {
+
+ private final String collection = "testcol";
+ private final int numDocs = 42;
+
+ protected GenericContainer<?> solr;
+
+ private final File testFileFolder = new File("target", "test-files");
+ private String solrHost;
+ private int solrPort;
+ private int zkPort;
+ private String solrEndpoint;
+
+ private void createTestHtmlFiles(String bodyContent) throws Exception {
+ testFileFolder.mkdirs();
+ for (int i = 0; i < numDocs; ++i) {
+ FileUtils.writeStringToFile(new File(testFileFolder, "test-" + i +
".html"), "<html><body>" + bodyContent + "</body></html>",
StandardCharsets.UTF_8);
+ }
+ }
+
+ protected void setupSolr(GenericContainer<?> solr) throws Exception {
+ createTestHtmlFiles("initial");
+ this.solr = solr;
+ solrHost = solr.getHost();
+ solrPort = solr.getMappedPort(8983);
+ zkPort = solr.getMappedPort(9983);
+ solrEndpoint = "http://" + solrHost + ":" + solrPort + "/solr";
+
+ solr.execInContainer("/opt/solr/bin/solr", "create_collection", "-c",
collection);
+
+ try (SolrClient solrClient = new LBHttpSolrClient.Builder()
+ .withBaseSolrUrls(solrEndpoint).build()) {
+
+ for (int i = 0; i < numDocs; ++i) {
+ SolrInputDocument solrDoc = new SolrInputDocument();
+ String filename = "test-" + i + ".html";
+ solrDoc.setField("id", filename);
+ solrDoc.setField("path", filename);
+ solrClient.add(collection, solrDoc);
+ }
+ solrClient.commit(collection);
+ }
+ }
+
+ /**
+ * Runs a test using Solr Pipe Iterator, File Fetcher and Solr Emitter.
+ * @param useZk If true, use zookeeper to connect to solr. Otherwise use
direct solr URLs.
+ */
+ protected void runTikaAsyncSolrPipeIteratorFileFetcherSolrEmitter(boolean
useZk) throws Exception {
+ File tikaConfigFile = new File("target", "ta.xml");
+ File log4jPropFile = new File("target", "tmp-log4j.properties");
+ try (InputStream is =
PipeIntegrationTests.class.getResourceAsStream("/tika-async-log4j.properties"))
{
+ FileUtils.copyInputStreamToFile(is, log4jPropFile);
+ }
+ String tikaConfigTemplateXml;
+ try (InputStream is =
PipeIntegrationTests.class.getResourceAsStream("/tika-config-solr-urls.xml")) {
+ tikaConfigTemplateXml = IOUtils.toString(is,
StandardCharsets.UTF_8);
+ }
+
+ String tikaConfigXml = createTikaConfigXml(useZk,
+ tikaConfigFile,
+ log4jPropFile,
+ tikaConfigTemplateXml,
+ SolrEmitter.UpdateStrategy.ADD,
+ SolrEmitter.AttachmentStrategy.CONCATENATE_CONTENT);
+ FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml,
StandardCharsets.UTF_8);
+
+ TikaCLI.main(new String[]{"-a", "--config=" +
tikaConfigFile.getAbsolutePath()});
+
+ try (SolrClient solrClient = new LBHttpSolrClient.Builder()
+ .withBaseSolrUrls(solrEndpoint).build()) {
+ solrClient.commit(collection);
+ Assert.assertEquals(numDocs, solrClient.query(collection, new
SolrQuery("mime_s:\"text/html;
charset=ISO-8859-1\"")).getResults().getNumFound());
+ Assert.assertEquals(numDocs, solrClient.query(collection, new
SolrQuery("content_s:*initial*")).getResults().getNumFound());
+ }
+
+ // update the documents with "update must exist" and run tika async
again with "UPDATE_MUST_EXIST". It should not fail, and docs should be updated.
+ createTestHtmlFiles("updated");
+ tikaConfigXml = createTikaConfigXml(useZk,
+ tikaConfigFile,
+ log4jPropFile,
+ tikaConfigTemplateXml,
+ SolrEmitter.UpdateStrategy.UPDATE_MUST_EXIST,
+ SolrEmitter.AttachmentStrategy.CONCATENATE_CONTENT);
+ FileUtils.writeStringToFile(tikaConfigFile, tikaConfigXml,
StandardCharsets.UTF_8);
+
+ TikaCLI.main(new String[]{"-a", "--config=" +
tikaConfigFile.getAbsolutePath()});
+
+ try (SolrClient solrClient = new LBHttpSolrClient.Builder()
+ .withBaseSolrUrls(solrEndpoint).build()) {
+ solrClient.commit(collection);
+ Assert.assertEquals(numDocs, solrClient.query(collection, new
SolrQuery("mime_s:\"text/html;
charset=ISO-8859-1\"")).getResults().getNumFound());
+ Assert.assertEquals(numDocs, solrClient.query(collection, new
SolrQuery("content_s:*updated*")).getResults().getNumFound());
+ }
+ }
+
+ @NotNull
+ private String createTikaConfigXml(boolean useZk,
+ File tikaConfigFile,
+ File log4jPropFile,
+ String tikaConfigTemplateXml,
+ SolrEmitter.UpdateStrategy
updateStrategy,
+ SolrEmitter.AttachmentStrategy
attachmentStrategy) {
+ String res = tikaConfigTemplateXml.replace("{TIKA_CONFIG}",
tikaConfigFile.getAbsolutePath())
+ .replace("{UPDATE_STRATEGY}", updateStrategy.toString())
+ .replace("{ATTACHMENT_STRATEGY}",
attachmentStrategy.toString())
+ .replace("{LOG4J_PROPERTIES_FILE}",
log4jPropFile.getAbsolutePath())
+ .replace("{PATH_TO_DOCS}", testFileFolder.getAbsolutePath());
+ if (useZk) {
+ res = res.replace("{SOLR_CONNECTION}", "<solrZkHosts>\n" +
+ " <solrZkHost>" + solrHost + ":" + zkPort +
"</solrZkHost>\n" +
+ " </solrZkHosts>\n");
+ } else {
+ res = res.replace("{SOLR_CONNECTION}", "<solrUrls>\n" +
+ " <solrUrl>http://" + solrHost + ":" + solrPort +
"/solr</solrUrl>\n" +
+ " </solrUrls>\n");
+ }
+ return res;
+ }
+}
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-async-log4j.properties
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-async-log4j.properties
new file mode 100644
index 0000000..c7c6821
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-async-log4j.properties
@@ -0,0 +1,13 @@
+status=debug
+name=PropertiesConfig
+filters=threshold
+filter.threshold.type=ThresholdFilter
+filter.threshold.level=debug
+appenders=console
+appender.console.type=Console
+appender.console.name=STDERR
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%-5p [%t] %d{HH:mm:ss,SSS} %c %m%n
+rootLogger.level=debug
+rootLogger.appenderRefs=stderr
+rootLogger.appenderRef.stderr.ref=STDERR
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-solr-urls.xml
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-solr-urls.xml
new file mode 100644
index 0000000..0fcb1e7
--- /dev/null
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-solr-urls.xml
@@ -0,0 +1,103 @@
+<properties>
+ <parsers>
+ <parser class="org.apache.tika.parser.DefaultParser">
+ <parser-exclude class="org.apache.tika.parser.ocr.TesseractOCRParser"/>
+ <parser-exclude class="org.apache.tika.parser.pdf.PDFParser"/>
+ <parser-exclude
class="org.apache.tika.parser.microsoft.ooxml.OOXMLParser"/>
+ <parser-exclude class="org.apache.tika.parser.microsoft.OfficeParser"/>
+ </parser>
+ <parser class="org.apache.tika.parser.pdf.PDFParser">
+ <params>
+ <param name="extractActions" type="bool">true</param>
+ <param name="checkExtractAccessPermissions" type="bool">true</param>
+ </params>
+ </parser>
+ <parser class="org.apache.tika.parser.microsoft.ooxml.OOXMLParser">
+ <params>
+ <param name="includeDeletedContent" type="bool">true</param>
+ <param name="includeMoveFromContent" type="bool">true</param>
+ <param name="extractMacros" type="bool">true</param>
+ </params>
+ </parser>
+ <parser class="org.apache.tika.parser.microsoft.OfficeParser">
+ <params>
+ <param name="extractMacros" type="bool">true</param>
+ </params>
+ </parser>
+ </parsers>
+ <metadataFilters>
+ <metadataFilter
class="org.apache.tika.metadata.filter.FieldNameMappingFilter">
+ <params>
+ <excludeUnmapped>true</excludeUnmapped>
+ <mappings>
+ <mapping from="X-TIKA:content" to="content_s"/>
+ <mapping from="Content-Length" to="length_i"/>
+ <mapping from="dc:creator" to="creators_ss"/>
+ <mapping from="dc:title" to="title_s"/>
+ <mapping from="Content-Type" to="mime_s"/>
+ <mapping from="X-TIKA:EXCEPTION:container_exception"
to="tika_exception_s"/>
+ </mappings>
+ </params>
+ </metadataFilter>
+ </metadataFilters>
+ <async>
+ <params>
+ <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+ <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+ <emitWithinMillis>60000</emitWithinMillis>
+ <numEmitters>1</numEmitters>
+ <numClients>1</numClients>
+ <tikaConfig>{TIKA_CONFIG}</tikaConfig>
+ <forkedJvmArgs>
+ <arg>-Xmx1g</arg>
+ <arg>-XX:ParallelGCThreads=2</arg>
+ <arg>-XX:+ExitOnOutOfMemoryError</arg>
+ <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
+ </forkedJvmArgs>
+ <timeoutMillis>60000</timeoutMillis>
+ </params>
+ </async>
+ <fetchers>
+ <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
+ <params>
+ <name>fsf</name>
+ <basePath>{PATH_TO_DOCS}</basePath>
+ </params>
+ </fetcher>
+ </fetchers>
+ <emitters>
+ <emitter class="org.apache.tika.pipes.emitter.solr.SolrEmitter">
+ <params>
+ <name>se</name>
+ {SOLR_CONNECTION}
+ <updateStrategy>{UPDATE_STRATEGY}</updateStrategy>
+ <solrCollection>testcol</solrCollection>
+ <attachmentStrategy>{ATTACHMENT_STRATEGY}</attachmentStrategy>
+ <contentField>content</contentField>
+ <commitWithin>10</commitWithin>
+ <idField>id</idField>
+ <connectionTimeout>10000</connectionTimeout>
+ <socketTimeout>60000</socketTimeout>
+ </params>
+ </emitter>
+ <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
+ <params>
+ <name>fse</name>
+ <basePath>/path/to/extracts</basePath>
+ </params>
+ </emitter>
+ </emitters>
+ <pipesIterator class="org.apache.tika.pipes.solrtest.SolrPipesIterator">
+ <params>
+ <solrCollection>testcol</solrCollection>
+ {SOLR_CONNECTION}
+ <idField>id</idField>
+ <parsingIdField>parsing_id_i</parsingIdField>
+ <failCountField>fail_count_i</failCountField>
+ <sizeFieldName>size_i</sizeFieldName>
+ <rows>10</rows>
+ <fetcherName>fsf</fetcherName>
+ <emitterName>se</emitterName>
+ </params>
+ </pipesIterator>
+</properties>
\ No newline at end of file
diff --git a/tika-pipes/tika-pipes-iterators/pom.xml
b/tika-pipes/tika-pipes-iterators/pom.xml
index 2600ff6..c929428 100644
--- a/tika-pipes/tika-pipes-iterators/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/pom.xml
@@ -39,5 +39,6 @@
<module>tika-pipes-iterator-csv</module>
<module>tika-pipes-iterator-jdbc</module>
<module>tika-pipes-iterator-s3</module>
+ <module>tika-pipes-iterator-solr</module>
</modules>
</project>
diff --git a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/pom.xml
similarity index 71%
copy from tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
copy to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/pom.xml
index 074152e..cb80a34 100644
--- a/tika-pipes/tika-emitters/tika-emitter-solr/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/pom.xml
@@ -21,13 +21,17 @@
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>tika-emitters</artifactId>
<groupId>org.apache.tika</groupId>
+ <artifactId>tika-pipes-iterators</artifactId>
<version>2.0.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>tika-emitter-solr</artifactId>
+ <artifactId>tika-pipes-iterator-solr</artifactId>
+
+ <name>Apache Tika Fetch Iterator - Solr</name>
+ <url>https://tika.apache.org/</url>
<dependencies>
<dependency>
@@ -37,18 +41,57 @@
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>tika-httpclient-commons</artifactId>
- <version>${project.version}</version>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>${aws.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>${commons.codec.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
+ <artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>${commons.logging.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.solr</groupId>
+ <artifactId>solr-solrj</artifactId>
+ <version>${solrj.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>tika-httpclient-commons</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
@@ -56,7 +99,6 @@
<scope>test</scope>
</dependency>
</dependencies>
-
<build>
<plugins>
<plugin>
@@ -65,7 +107,7 @@
<configuration>
<archive>
<manifestEntries>
-
<Automatic-Module-Name>org.apache.tika.pipes.emitter.solr</Automatic-Module-Name>
+
<Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.solr</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
@@ -125,4 +167,4 @@
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java
new file mode 100644
index 0000000..78d6a49
--- /dev/null
+++
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/solrtest/SolrPipesIterator.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.pipes.solrtest;
+
+import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.params.CursorMarkParams;
+import org.apache.tika.client.HttpClientFactory;
+import org.apache.tika.config.Field;
+import org.apache.tika.config.Initializable;
+import org.apache.tika.config.InitializableProblemHandler;
+import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
+import org.apache.tika.pipes.HandlerConfig;
+import org.apache.tika.pipes.emitter.EmitKey;
+import org.apache.tika.pipes.fetcher.FetchKey;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Iterates through results from a Solr query.
+ */
+public class SolrPipesIterator extends PipesIterator implements Initializable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SolrPipesIterator.class);
+
+ private String solrCollection;
+ /**
+ * You can specify solrUrls, or you can specify solrZkHosts and use use
zookeeper to determine the solr server urls.
+ */
+ private List<String> solrUrls = Collections.emptyList();
+ private List<String> solrZkHosts = Collections.emptyList();
+ private String solrZkChroot;
+ private List<String> filters = Collections.emptyList();
+ private String idField;
+ private String parsingIdField;
+ private String failCountField;
+ private String sizeFieldName;
+ private List<String> additionalFields = Collections.emptyList();
+ private int rows = 5000;
+ private int connectionTimeout = 10000;
+ private int socketTimeout = 60000;
+
+ private final HttpClientFactory httpClientFactory;
+
+ public SolrPipesIterator() throws TikaConfigException {
+ httpClientFactory = new HttpClientFactory();
+ }
+
+ @Field
+ public void setSolrZkHosts(List<String> solrZkHosts) {
+ this.solrZkHosts = solrZkHosts;
+ }
+
+ @Field
+ public void setSolrZkChroot(String solrZkChroot) {
+ this.solrZkChroot = solrZkChroot;
+ }
+
+ @Field
+ public void setSolrCollection(String solrCollection) {
+ this.solrCollection = solrCollection;
+ }
+
+ @Field
+ public void setSolrUrls(List<String> solrUrls) {
+ this.solrUrls = solrUrls;
+ }
+
+ @Field
+ public void setFilters(List<String> filters) {
+ this.filters = filters;
+ }
+
+ @Field
+ public void setAdditionalFields(List<String> additionalFields) {
+ this.additionalFields = additionalFields;
+ }
+
+ @Field
+ public void setIdField(String idField) {
+ this.idField = idField;
+ }
+
+ @Field
+ public void setParsingIdField(String parsingIdField) {
+ this.parsingIdField = parsingIdField;
+ }
+
+ @Field
+ public void setFailCountField(String failCountField) {
+ this.failCountField = failCountField;
+ }
+
+ @Field
+ public void setSizeFieldName(String sizeFieldName) {
+ this.sizeFieldName = sizeFieldName;
+ }
+
+ @Field
+ public void setRows(int rows) {
+ this.rows = rows;
+ }
+
+ @Field
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ @Field
+ public void setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
+ //TODO -- add other httpclient configurations??
+ @Field
+ public void setUserName(String userName) {
+ httpClientFactory.setUserName(userName);
+ }
+
+ @Field
+ public void setPassword(String password) {
+ httpClientFactory.setPassword(password);
+ }
+
+ @Field
+ public void setAuthScheme(String authScheme) {
+ httpClientFactory.setAuthScheme(authScheme);
+ }
+
+ @Field
+ public void setProxyHost(String proxyHost) {
+ httpClientFactory.setProxyHost(proxyHost);
+ }
+
+ @Field
+ public void setProxyPort(int proxyPort) {
+ httpClientFactory.setProxyPort(proxyPort);
+ }
+
+ @Override
+ protected void enqueue() throws InterruptedException, IOException,
TimeoutException {
+ String fetcherName = getFetcherName();
+ String emitterName = getEmitterName();
+
+ try (SolrClient solrClient = createSolrClient()) {
+ int fileCount = 0;
+
+ SolrQuery query = new SolrQuery();
+ query.set("q", "*:*");
+ query.setRows(rows);
+
+ Set<String> allFields = new HashSet<>();
+ allFields.add("id");
+ allFields.add(idField);
+ allFields.add(parsingIdField);
+ allFields.add(failCountField);
+ allFields.add(sizeFieldName);
+ allFields.addAll(additionalFields);
+
+ query.setFields(allFields.toArray(new String[]{}));
+ query.setSort(SolrQuery.SortClause.asc(parsingIdField));
+ query.addSort(SolrQuery.SortClause.asc("id"));
+ query.setFilterQueries(filters.toArray(new String[]{}));
+
+ HandlerConfig handlerConfig = getHandlerConfig();
+
+ String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+ boolean done = false;
+ while (!done) {
+ query.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+ QueryResponse qr = solrClient.query(solrCollection, query);
+ long totalToFetch = qr.getResults().getNumFound();
+ String nextCursorMark = qr.getNextCursorMark();
+ LOGGER.info("Query to fetch files to parse collection={},
q={}, onCount={}, totalCount={}", solrCollection, query, fileCount,
totalToFetch);
+ for (SolrDocument sd : qr.getResults()) {
+ ++fileCount;
+ String fetchKey = (String) sd.getFieldValue(idField);
+ String emitKey = (String) sd.getFieldValue(idField);
+ Metadata metadata = new Metadata();
+ for (String nextField : allFields) {
+ metadata.add(nextField, (String)
sd.getFieldValue(nextField));
+ }
+ LOGGER.info("iterator doc: {}, idField={}, fetchKey={}",
sd, idField, fetchKey);
+ tryToAdd(new FetchEmitTuple(fetchKey,
+ new FetchKey(fetcherName, fetchKey),
+ new EmitKey(emitterName, emitKey),
+ new Metadata(),
+ handlerConfig,
+ getOnParseException()));
+ }
+ if (cursorMark.equals(nextCursorMark)) {
+ done = true;
+ }
+ cursorMark = nextCursorMark;
+ }
+ } catch (SolrServerException | TikaConfigException e) {
+ LOGGER.error("Could not iterate through solr", e);
+ }
+ }
+
+ private SolrClient createSolrClient() throws TikaConfigException {
+ if (solrUrls == null || solrUrls.isEmpty()) {
+ return new CloudSolrClient.Builder(solrZkHosts,
Optional.ofNullable(solrZkChroot))
+ .withHttpClient(httpClientFactory.build())
+ .withConnectionTimeout(connectionTimeout)
+ .withSocketTimeout(socketTimeout)
+ .build();
+ }
+ return new LBHttpSolrClient.Builder()
+ .withConnectionTimeout(connectionTimeout)
+ .withSocketTimeout(socketTimeout)
+ .withHttpClient(httpClientFactory.build())
+ .withBaseSolrUrls(solrUrls.toArray(new String[]{})).build();
+ }
+
+ @Override
+ public void checkInitialization(InitializableProblemHandler problemHandler)
+ throws TikaConfigException {
+ super.checkInitialization(problemHandler);
+ mustNotBeEmpty("solrCollection", this.solrCollection);
+ mustNotBeEmpty("urlFieldName", this.idField);
+ mustNotBeEmpty("parsingIdField", this.parsingIdField);
+ mustNotBeEmpty("failCountField", this.failCountField);
+ mustNotBeEmpty("sizeFieldName", this.sizeFieldName);
+ if ((this.solrUrls == null || this.solrUrls.isEmpty()) &&
(this.solrZkHosts == null || this.solrZkHosts.isEmpty())) {
+ throw new IllegalArgumentException("expected either param solrUrls
or param solrZkHosts, but neither was specified");
+ }
+ if (this.solrUrls != null && !this.solrUrls.isEmpty() &&
this.solrZkHosts != null && !this.solrZkHosts.isEmpty()) {
+ throw new IllegalArgumentException("expected either param solrUrls
or param solrZkHosts, but both were specified");
+ }
+ }
+}