This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 0fa9adc TIKA-3391 refactor fetchiterators to pipes iterators
0fa9adc is described below
commit 0fa9adc6e88eb56dc9401b512790e685c660106a
Author: tallison <[email protected]>
AuthorDate: Tue May 11 10:44:46 2021 -0400
TIKA-3391 refactor fetchiterators to pipes iterators
---
.../java/org/apache/tika/config/ConfigBase.java | 80 ++++++++++++++++++----
.../apache/tika/pipes/async/AsyncProcessor.java | 4 +-
.../pipes/fetchiterator/FetchIteratorManager.java | 69 -------------------
.../FileSystemPipesIterator.java} | 8 +--
.../PipesIterator.java} | 21 ++++--
.../apache/tika/config/TikaPipesConfigTest.java | 18 ++---
.../tika/pipes/async/AsyncProcessorTest.java | 4 +-
.../FileSystemPipesIteratorTest.java} | 6 +-
...erator-config.xml => pipes-iterator-config.xml} | 14 ++--
...nfig.xml => pipes-iterator-multiple-config.xml} | 26 ++++---
tika-pipes/pom.xml | 2 +-
.../apache/tika/pipes/PipeIntegrationTests.java | 24 +++----
.../src/test/resources/tika-config-s3ToFs.xml | 8 +--
.../src/test/resources/tika-config-s3Tos3.xml | 8 +--
.../pom.xml | 8 +--
.../tika-pipes-iterator-csv}/pom.xml | 4 +-
.../pipes/pipesiterator/csv/CSVPipesIterator.java} | 8 +--
.../src/test/java/TestCSVPipesIterator.java} | 12 ++--
.../src/test/resources/test-simple.csv | 0
.../tika-pipes-iterator-jdbc}/pom.xml | 4 +-
.../pipesiterator/jdbc/JDBCPipesIterator.java} | 8 +--
.../pipesiterator/jdbc/TestJDBCPipesIterator.java} | 32 ++++-----
.../src/test/resources/log4j.properties | 0
.../tika-pipes-iterator-s3}/pom.xml | 4 +-
.../pipes/pipesiterator/s3/S3PipesIterator.java} | 10 +--
.../pipesiterator/s3/TestS3PipesIterator.java} | 12 ++--
.../src/test/resources/log4j.properties | 0
.../apache/tika/server/client/TikaClientCLI.java | 25 ++++---
.../resources/tika-config-simple-fs-emitter.xml | 8 +--
29 files changed, 207 insertions(+), 220 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
index f8b6791..e332807 100644
--- a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
@@ -43,6 +43,52 @@ import org.apache.tika.utils.XMLReaderUtils;
public abstract class ConfigBase {
/**
+ * Use this to build a single class, where the user specifies the instance
class, e.g.
+ * PipesIterator
+ *
+ * @param itemName
+ * @param is
+ * @throws TikaConfigException
+ * @throws IOException
+ */
+ protected static <T> T buildSingle(String itemName, Class<T> itemClass,
InputStream is)
+ throws TikaConfigException, IOException {
+ Node properties = null;
+ try {
+ properties = XMLReaderUtils.buildDOM(is).getDocumentElement();
+ } catch (SAXException e) {
+ throw new IOException(e);
+ } catch (TikaException e) {
+ throw new TikaConfigException("problem loading xml to dom", e);
+ }
+ if (!properties.getLocalName().equals("properties")) {
+ throw new TikaConfigException("expect properties as root node");
+ }
+ NodeList children = properties.getChildNodes();
+ T toReturn = null;
+ for (int i = 0; i < children.getLength(); i++) {
+ Node child = children.item(i);
+ if (child.getNodeType() != 1) {
+ continue;
+ }
+ if (itemName.equals(child.getLocalName())) {
+ if (toReturn != null) {
+ throw new TikaConfigException("There can only be one " +
itemName +
+ " in a config");
+ }
+ T item = buildClass(child, itemName, itemClass);
+ setParams(item, child, new HashSet<>());
+ toReturn = (T)item;
+ }
+ }
+ if (toReturn == null) {
+ throw new TikaConfigException("could not find " + itemName);
+ }
+ return toReturn;
+ }
+
+
+ /**
* Use this to build a list of components for a composite item (e.g.
* CompositeMetadataFilter, FetcherManager), each with their own
configurations
*
@@ -224,9 +270,27 @@ public abstract class ConfigBase {
Node n = nodeList.item(i);
if (n.getNodeType() == 1) {
NamedNodeMap m = n.getAttributes();
- String from = m.getNamedItem("from").getTextContent();
- String to = m.getNamedItem("to").getTextContent();
- map.put(from, to);
+ String key = null;
+ String value = null;
+ if (m.getNamedItem("from") != null) {
+ key = m.getNamedItem("from").getTextContent();
+ } else if (m.getNamedItem("key") != null) {
+ key = m.getNamedItem("key").getTextContent();
+ }
+ if (m.getNamedItem("to") != null) {
+ value = m.getNamedItem("to").getTextContent();
+ } else if (m.getNamedItem("value") != null) {
+ value = m.getNamedItem("value").getTextContent();
+ }
+ if (key == null) {
+ throw new TikaConfigException("must specify a 'key' or
'from' value in a map " +
+ "object : " + param);
+ }
+ if (value == null) {
+ throw new TikaConfigException("must specify a 'value' or
'to' value in a " +
+ "map object : " + param);
+ }
+ map.put(key, value);
}
}
@@ -315,16 +379,6 @@ public abstract class ConfigBase {
"Couldn't find setter: " + setter + " for object " +
object.getClass());
}
- private static List<String> loadStringList(String itemName, NodeList
nodelist) {
- List<String> list = new ArrayList<>();
- for (int i = 0; i < nodelist.getLength(); i++) {
- Node n = nodelist.item(i);
- if (itemName.equals(n.getLocalName())) {
- list.add(n.getTextContent());
- }
- }
- return list;
- }
/**
* This should be overridden to do something with the settings
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 0d66074..5453581 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -38,7 +38,7 @@ import org.apache.tika.pipes.PipesException;
import org.apache.tika.pipes.PipesResult;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
/**
* This is the main class for handling async requests. This manages
@@ -174,7 +174,7 @@ public class AsyncProcessor implements Closeable {
FetchEmitTuple t = fetchEmitTuples.poll(1,
TimeUnit.SECONDS);
if (t == null) {
//skip
- } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+ } else if (t == PipesIterator.COMPLETED_SEMAPHORE) {
return PARSER_FUTURE_CODE;
} else {
PipesResult result = null;
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIteratorManager.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIteratorManager.java
deleted file mode 100644
index 41f7460..0000000
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIteratorManager.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.fetchiterator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.tika.config.ConfigBase;
-import org.apache.tika.config.Initializable;
-import org.apache.tika.config.InitializableProblemHandler;
-import org.apache.tika.config.Param;
-import org.apache.tika.exception.TikaConfigException;
-
-/**
- * This class currently only supports one fetch iterator at a time.
- */
-public class FetchIteratorManager extends ConfigBase implements Initializable {
-
- public static FetchIteratorManager build(Path tikaConfigFile) throws
IOException,
- TikaConfigException {
- try (InputStream is = Files.newInputStream(tikaConfigFile)) {
- return buildComposite("fetchIterators",
FetchIteratorManager.class, "fetchIterator",
- FetchIterator.class, is);
- }
- }
-
- private List<FetchIterator> fetchIterators = new ArrayList<>();
- public FetchIteratorManager(List<FetchIterator> fetchIterators) {
- this.fetchIterators = fetchIterators;
- }
-
- public FetchIterator getFetchIterator() {
- return fetchIterators.get(0);
- }
-
- @Override
- public void initialize(Map<String, Param> params) throws
TikaConfigException {
-
- }
-
- @Override
- public void checkInitialization(InitializableProblemHandler
problemHandler) throws TikaConfigException {
- if (fetchIterators.size() == 0) {
- throw new TikaConfigException("must be at least one fetch
iterator");
- }
- if (fetchIterators.size() > 1) {
- throw new TikaConfigException("Sorry, we currently only support a
single fetch iterator");
- }
- }
-}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIterator.java
similarity index 95%
rename from
tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
rename to
tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIterator.java
index aef095d..ce0f208 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIterator.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetchiterator;
+package org.apache.tika.pipes.pipesiterator;
import java.io.IOException;
import java.nio.file.FileVisitResult;
@@ -35,14 +35,14 @@ import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
-public class FileSystemFetchIterator extends FetchIterator implements
Initializable {
+public class FileSystemPipesIterator extends PipesIterator implements
Initializable {
private Path basePath;
- public FileSystemFetchIterator() {
+ public FileSystemPipesIterator() {
}
- public FileSystemFetchIterator(Path basePath) {
+ public FileSystemPipesIterator(Path basePath) {
this.basePath = basePath;
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
similarity index 93%
rename from
tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
rename to
tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
index 490a77b..dfb5bf4 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
@@ -14,9 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetchiterator;
+package org.apache.tika.pipes.pipesiterator;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
@@ -29,6 +32,7 @@ import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.tika.config.ConfigBase;
import org.apache.tika.config.Field;
import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
@@ -46,8 +50,8 @@ import org.apache.tika.sax.BasicContentHandlerFactory;
* a RuntimeException. It will throw an IllegalStateException if
* next() is called after hasNext() has returned false.
*/
-public abstract class FetchIterator
- implements Callable<Integer>, Iterable<FetchEmitTuple>, Initializable {
+public abstract class PipesIterator extends ConfigBase
+ implements Callable<Integer>, Iterable<FetchEmitTuple>, Initializable
{
public static final long DEFAULT_MAX_WAIT_MS = 300_000;
public static final int DEFAULT_QUEUE_SIZE = 1000;
@@ -55,7 +59,7 @@ public abstract class FetchIterator
public static final FetchEmitTuple COMPLETED_SEMAPHORE =
new FetchEmitTuple(null,null, null, null, null, null);
- private static final Logger LOGGER =
LoggerFactory.getLogger(FetchIterator.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipesIterator.class);
private long maxWaitMs = DEFAULT_MAX_WAIT_MS;
private ArrayBlockingQueue<FetchEmitTuple> queue = null;
@@ -72,6 +76,15 @@ public abstract class FetchIterator
private int added = 0;
private FutureTask<Integer> futureTask;
+ public static PipesIterator build(Path tikaConfigFile) throws IOException,
+ TikaConfigException {
+ try (InputStream is = Files.newInputStream(tikaConfigFile)) {
+ return buildSingle(
+ "pipesIterator",
+ PipesIterator.class, is);
+ }
+ }
+
public String getFetcherName() {
return fetcherName;
}
diff --git
a/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
b/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
index 80513da..7777278 100644
--- a/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
+++ b/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
@@ -29,7 +29,7 @@ import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
import org.apache.tika.pipes.fetcher.FileSystemFetcher;
-import org.apache.tika.pipes.fetchiterator.FetchIteratorManager;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
public class TikaPipesConfigTest extends AbstractTikaConfigTest {
//this handles tests for the newer pipes type configs.
@@ -78,17 +78,17 @@ public class TikaPipesConfigTest extends
AbstractTikaConfigTest {
}
@Test
- public void testFetchIterator() throws Exception {
- FetchIteratorManager fim =
-
FetchIteratorManager.build(getConfigFilePath("fetch-iterator-config.xml"));
- assertEquals("fs1", fim.getFetchIterator().getFetcherName());
+ public void testPipesIterator() throws Exception {
+ PipesIterator it =
+
PipesIterator.build(getConfigFilePath("pipes-iterator-config.xml"));
+ assertEquals("fs1", it.getFetcherName());
}
@Test(expected = TikaConfigException.class)
- public void testMultipleFetchIterators() throws Exception {
- FetchIteratorManager fim =
-
FetchIteratorManager.build(getConfigFilePath("fetch-iterator-multiple-config.xml"));
- assertEquals("fs1", fim.getFetchIterator().getFetcherName());
+ public void testMultiplePipesIterators() throws Exception {
+ PipesIterator it =
+
PipesIterator.build(getConfigFilePath("pipes-iterator-multiple-config.xml"));
+ assertEquals("fs1", it.getFetcherName());
}
}
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 6a82c6f..3040d3a 100644
---
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -37,7 +37,7 @@ import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.apache.tika.utils.ProcessUtils;
public class AsyncProcessorTest {
@@ -101,7 +101,7 @@ public class AsyncProcessorTest {
processor.offer(t, 1000);
}
for (int i = 0; i < 10; i++) {
- processor.offer(FetchIterator.COMPLETED_SEMAPHORE, 1000);
+ processor.offer(PipesIterator.COMPLETED_SEMAPHORE, 1000);
}
//TODO clean this up
while (processor.checkActive()) {
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
b/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
similarity index 93%
rename from
tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
rename to
tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
index c87e3d0..7efeb8a 100644
---
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++
b/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetchiterator;
+package org.apache.tika.pipes.pipesiterator;
import static org.junit.Assert.assertEquals;
@@ -33,7 +33,7 @@ import org.junit.Test;
import org.apache.tika.pipes.FetchEmitTuple;
-public class FileSystemFetchIteratorTest {
+public class FileSystemPipesIteratorTest {
public static List<Path> listFiles(Path path) throws IOException {
@@ -57,7 +57,7 @@ public class FileSystemFetchIteratorTest {
}
String fetcherName = "fs";
- FetchIterator it = new FileSystemFetchIterator(root);
+ PipesIterator it = new FileSystemPipesIterator(root);
it.setFetcherName(fetcherName);
it.setQueueSize(2);
diff --git
a/tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-config.xml
b/tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml
similarity index 76%
rename from
tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-config.xml
rename to
tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml
index 48d24fe..3f3fa51 100644
---
a/tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-config.xml
+++
b/tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml
@@ -16,12 +16,10 @@
limitations under the License.
-->
<properties>
- <fetchIterators>
- <fetchIterator
class="org.apache.tika.pipes.fetchiterator.FileSystemFetchIterator">
- <params>
- <fetcherName>fs1</fetcherName>
- <basePath>/my/base/path1</basePath>
- </params>
- </fetchIterator>
- </fetchIterators>
+ <pipesIterator
class="org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator">
+ <params>
+ <fetcherName>fs1</fetcherName>
+ <basePath>/my/base/path1</basePath>
+ </params>
+ </pipesIterator>
</properties>
diff --git
a/tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-multiple-config.xml
b/tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml
similarity index 63%
rename from
tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-multiple-config.xml
rename to
tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml
index 3e4352d..c43a518 100644
---
a/tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-multiple-config.xml
+++
b/tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml
@@ -16,18 +16,16 @@
limitations under the License.
-->
<properties>
- <fetchIterators>
- <fetchIterator
class="org.apache.tika.pipes.fetchiterator.FileSystemFetchIterator">
- <params>
- <fetcherName>fs1</fetcherName>
- <basePath>/my/base/path1</basePath>
- </params>
- </fetchIterator>
- <fetchIterator
class="org.apache.tika.pipes.fetchiterator.FileSystemFetchIterator">
- <params>
- <fetcherName>fs2</fetcherName>
- <basePath>/my/base/path2</basePath>
- </params>
- </fetchIterator>
- </fetchIterators>
+ <pipesIterator
class="org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator">
+ <params>
+ <fetcherName>fs1</fetcherName>
+ <basePath>/my/base/path1</basePath>
+ </params>
+ </pipesIterator>
+ <pipesIterator
class="org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator">
+ <params>
+ <fetcherName>fs2</fetcherName>
+ <basePath>/my/base/path2</basePath>
+ </params>
+ </pipesIterator>
</properties>
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 8614090..8f79838 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -35,7 +35,7 @@
<module>tika-httpclient-commons</module>
<module>tika-fetchers</module>
<module>tika-emitters</module>
- <module>tika-fetch-iterators</module>
+ <module>tika-pipes-iterators</module>
<module>tika-pipes-integration-tests</module>
</modules>
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 6b74aea..7cad1da 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -48,8 +48,7 @@ import org.apache.tika.pipes.emitter.EmitterManager;
import org.apache.tika.pipes.emitter.s3.S3Emitter;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchIteratorManager;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
@Ignore("turn these into actual tests")
public class PipeIntegrationTests {
@@ -89,7 +88,7 @@ public class PipeIntegrationTests {
@Test
public void testS3ToFS() throws Exception {
Fetcher fetcher = getFetcher("tika-config-s3ToFs.xml", "s3f");
- FetchIterator fetchIterator =
getFetchIterator("tika-config-s3ToFs.xml");
+ PipesIterator pipesIterator =
getPipesIterator("tika-config-s3ToFs.xml");
int numConsumers = 1;
ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
@@ -98,11 +97,11 @@ public class PipeIntegrationTests {
for (int i = 0; i < numConsumers; i++) {
completionService.submit(new FSFetcherEmitter(queue, fetcher,
null));
}
- for (FetchEmitTuple t : fetchIterator) {
+ for (FetchEmitTuple t : pipesIterator) {
queue.offer(t);
}
for (int i = 0; i < numConsumers; i++) {
- queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+ queue.offer(PipesIterator.COMPLETED_SEMAPHORE);
}
int finished = 0;
try {
@@ -119,7 +118,7 @@ public class PipeIntegrationTests {
public void testS3ToS3() throws Exception {
Fetcher fetcher = getFetcher("tika-config-s3Tos3.xml", "s3f");
Emitter emitter = getEmitter("tika-config-s3Tos3.xml", "s3e");
- FetchIterator fetchIterator =
getFetchIterator("tika-config-s3Tos3.xml");
+ PipesIterator pipesIterator =
getPipesIterator("tika-config-s3Tos3.xml");
int numConsumers = 20;
ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
ExecutorCompletionService<Integer> completionService = new
ExecutorCompletionService<>(es);
@@ -127,11 +126,11 @@ public class PipeIntegrationTests {
for (int i = 0; i < numConsumers; i++) {
completionService.submit(new S3FetcherEmitter(queue, fetcher,
(S3Emitter) emitter));
}
- for (FetchEmitTuple t : fetchIterator) {
+ for (FetchEmitTuple t : pipesIterator) {
queue.offer(t);
}
for (int i = 0; i < numConsumers; i++) {
- queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+ queue.offer(PipesIterator.COMPLETED_SEMAPHORE);
}
int finished = 0;
try {
@@ -154,9 +153,8 @@ public class PipeIntegrationTests {
return manager.getEmitter(emitterName);
}
- private FetchIterator getFetchIterator(String fileName) throws Exception {
- FetchIteratorManager fim =
FetchIteratorManager.build(getPath(fileName));
- return fim.getFetchIterator();
+ private PipesIterator getPipesIterator(String fileName) throws Exception {
+ return PipesIterator.build(getPath(fileName));
}
private Path getPath(String fileName) throws Exception {
@@ -186,7 +184,7 @@ public class PipeIntegrationTests {
if (t == null) {
throw new TimeoutException("");
}
- if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+ if (t == PipesIterator.COMPLETED_SEMAPHORE) {
return 1;
}
process(t);
@@ -228,7 +226,7 @@ public class PipeIntegrationTests {
if (t == null) {
throw new TimeoutException("");
}
- if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+ if (t == PipesIterator.COMPLETED_SEMAPHORE) {
return 1;
}
process(t);
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3ToFs.xml
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3ToFs.xml
index 01387cb..c0be998 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3ToFs.xml
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3ToFs.xml
@@ -27,14 +27,14 @@
</params>
</fetcher>
</fetchers>
- <fetchIterators>
- <fetchIterator
class="org.apache.tika.pipes.fetchiterator.s3.S3FetchIterator">
+ <pipesIterators>
+ <pipesIterator
class="org.apache.tika.pipes.pipesiterator.s3.S3PipesIterator">
<params>
<fetcherName>s3</fetcherName>
<bucket><!-- fill in here --></bucket>
<region>us-east-1</region>
<profile><!-- fill in here --></profile>
</params>
- </fetchIterator>
- </fetchIterators>
+ </pipesIterator>
+ </pipesIterators>
</properties>
\ No newline at end of file
diff --git
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
index 57c0a12..3302b96 100644
---
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
+++
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
@@ -28,16 +28,16 @@
</params>
</fetcher>
</fetchers>
- <fetchIterators>
- <fetchIterator
class="org.apache.tika.pipes.fetchiterator.s3.S3FetchIterator">
+ <pipesIterators>
+ <pipesIterator
class="org.apache.tika.pipes.pipesiterator.s3.S3PipesIterator">
<params>
<fetcherName>s3f</fetcherName>
<region>us-east-1</region>
<bucket><!-- fill in here --></bucket>
<profile><!-- fill in here --></profile>
</params>
- </fetchIterator>
- </fetchIterators>
+ </pipesIterator>
+ </pipesIterators>
<emitters>
<emitter class="org.apache.tika.pipes.emitter.s3.S3Emitter">
<params>
diff --git a/tika-pipes/tika-fetch-iterators/pom.xml
b/tika-pipes/tika-pipes-iterators/pom.xml
similarity index 88%
rename from tika-pipes/tika-fetch-iterators/pom.xml
rename to tika-pipes/tika-pipes-iterators/pom.xml
index 49451a4..cad1a08 100644
--- a/tika-pipes/tika-fetch-iterators/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/pom.xml
@@ -33,11 +33,11 @@
<name>Apache Tika fetch iterators</name>
<url>https://tika.apache.org/</url>
- <!-- see also org.apache.tika.pipes.fetchiterator.FileSystemIterator
+ <!-- see also org.apache.tika.pipes.pipesiterator.FileSystemIterator
in tika-core if you want a file system directory crawler -->
<modules>
- <module>tika-fetch-iterator-csv</module>
- <module>tika-fetch-iterator-jdbc</module>
- <module>tika-fetch-iterator-s3</module>
+ <module>tika-pipes-iterator-csv</module>
+ <module>tika-pipes-iterator-jdbc</module>
+ <module>tika-pipes-iterator-s3</module>
</modules>
</project>
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/pom.xml
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/pom.xml
similarity index 97%
rename from tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/pom.xml
rename to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/pom.xml
index f6197a4..a50acb5 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/pom.xml
@@ -28,7 +28,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>tika-fetch-iterator-csv</artifactId>
+ <artifactId>tika-pipes-iterator-csv</artifactId>
<name>Apache Tika Fetch Iterator - CSV</name>
<url>https://tika.apache.org/</url>
@@ -69,7 +69,7 @@
<configuration>
<archive>
<manifestEntries>
-
<Automatic-Module-Name>org.apache.tika.pipes.fetchiterator.csv</Automatic-Module-Name>
+
<Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.csv</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
similarity index 98%
rename from
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
rename to
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
index 7799b05..17ea954 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetchiterator.csv;
+package org.apache.tika.pipes.pipesiterator.csv;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
@@ -44,7 +44,7 @@ import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.HandlerConfig;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.apache.tika.utils.StringUtils;
/**
@@ -75,9 +75,9 @@ import org.apache.tika.utils.StringUtils;
* <li>The 'emitKeyColumn' value is not added to the metadata.</li>
* </ul>
*/
-public class CSVFetchIterator extends FetchIterator implements Initializable {
+public class CSVPipesIterator extends PipesIterator implements Initializable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(CSVFetchIterator.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CSVPipesIterator.class);
private final Charset charset = StandardCharsets.UTF_8;
private Path csvPath;
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
similarity index 92%
rename from
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
rename to
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
index 31ff496..1293ea6 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
+++
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-import static
org.apache.tika.pipes.fetchiterator.FetchIterator.COMPLETED_SEMAPHORE;
+import static
org.apache.tika.pipes.pipesiterator.PipesIterator.COMPLETED_SEMAPHORE;
import static org.junit.Assert.assertEquals;
import java.nio.file.Path;
@@ -33,15 +33,15 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
+import org.apache.tika.pipes.pipesiterator.csv.CSVPipesIterator;
-public class TestCSVFetchIterator {
+public class TestCSVPipesIterator {
@Test
public void testSimple() throws Exception {
Path p = get("test-simple.csv");
- CSVFetchIterator it = new CSVFetchIterator();
+ CSVPipesIterator it = new CSVPipesIterator();
it.setFetcherName("fsf");
it.setEmitterName("fse");
it.setCsvPath(p);
@@ -86,7 +86,7 @@ public class TestCSVFetchIterator {
@Test(expected = RuntimeException.class)
public void testBadFetchKeyCol() throws Exception {
Path p = get("test-simple.csv");
- CSVFetchIterator it = new CSVFetchIterator();
+ CSVPipesIterator it = new CSVPipesIterator();
it.setFetcherName("fs");
it.setCsvPath(p);
it.setFetchKeyColumn("fetchKeyDoesntExist");
@@ -96,7 +96,7 @@ public class TestCSVFetchIterator {
}
private Path get(String testFileName) throws Exception {
- return Paths.get(TestCSVFetchIterator.class.getResource("/" +
testFileName).toURI());
+ return Paths.get(TestCSVPipesIterator.class.getResource("/" +
testFileName).toURI());
}
private static class MockFetcher implements Callable<Integer> {
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/resources/test-simple.csv
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/resources/test-simple.csv
similarity index 100%
rename from
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/resources/test-simple.csv
rename to
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/resources/test-simple.csv
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/pom.xml
similarity index 97%
rename from tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
rename to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/pom.xml
index 2cdbcd9..50aa604 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/pom.xml
@@ -28,7 +28,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>tika-fetch-iterator-jdbc</artifactId>
+ <artifactId>tika-pipes-iterator-jdbc</artifactId>
<name>Apache Tika Fetch Iterator - jdbc</name>
<url>https://tika.apache.org/</url>
@@ -60,7 +60,7 @@
<configuration>
<archive>
<manifestEntries>
-
<Automatic-Module-Name>org.apache.tika.pipes.fetchiterator.jdbc</Automatic-Module-Name>
+
<Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.jdbc</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
similarity index 98%
rename from
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
rename to
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
index ac8f2a2..024b99b 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetchiterator.jdbc;
+package org.apache.tika.pipes.pipesiterator.jdbc;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
@@ -43,7 +43,7 @@ import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.HandlerConfig;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
import org.apache.tika.utils.StringUtils;
/**
@@ -64,10 +64,10 @@ import org.apache.tika.utils.StringUtils;
* <li>The 'emitKeyColumn' value is not added to the metadata.</li>
* </ul>
*/
-public class JDBCFetchIterator extends FetchIterator implements Initializable {
+public class JDBCPipesIterator extends PipesIterator implements Initializable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(JDBCFetchIterator.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(JDBCPipesIterator.class);
private String idColumn;
private String fetchKeyColumn;
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/java/org/apache/tika/pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java
similarity index 86%
rename from
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
rename to
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/java/org/apache/tika/pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java
index 9de5010..177950d 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/java/org/apache/tika/pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetchiterator.jdbc;
+package org.apache.tika.pipes.pipesiterator.jdbc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -44,10 +44,9 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchIteratorManager;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
-public class TestJDBCFetchIterator {
+public class TestJDBCPipesIterator {
static final String TABLE = "fetchkeys";
static final String db = "mydb";
@@ -57,7 +56,7 @@ public class TestJDBCFetchIterator {
@BeforeClass
public static void setUp() throws Exception {
- DB_DIR = Files.createTempDirectory("tika-jdbc-fetchiterator-test-");
+ DB_DIR = Files.createTempDirectory("tika-jdbc-pipesiterator-test-");
CONNECTION =
DriverManager.getConnection("jdbc:h2:file:" +
@@ -88,10 +87,9 @@ public class TestJDBCFetchIterator {
@Test
public void testSimple() throws Exception {
- FetchIteratorManager fetchIteratorManager = getConfig();
int numConsumers = 5;
- FetchIterator fetchIterator = fetchIteratorManager.getFetchIterator();
+ PipesIterator pipesIterator = getConfig();
ExecutorService es = Executors.newFixedThreadPool(numConsumers);
ExecutorCompletionService<Integer> completionService =
new ExecutorCompletionService<>(es);
@@ -103,13 +101,13 @@ public class TestJDBCFetchIterator {
completionService.submit(mockFetcher);
}
int offered = 0;
- for (FetchEmitTuple t : fetchIterator) {
+ for (FetchEmitTuple t : pipesIterator) {
queue.put(t);
offered++;
}
assertEquals(NUM_ROWS, offered);
for (int i = 0; i < numConsumers; i++) {
- queue.put(FetchIterator.COMPLETED_SEMAPHORE);
+ queue.put(PipesIterator.COMPLETED_SEMAPHORE);
}
int processed = 0;
int completed = 0;
@@ -142,11 +140,10 @@ public class TestJDBCFetchIterator {
assertEquals(NUM_ROWS, cnt);
}
- private FetchIteratorManager getConfig() throws Exception {
+ private PipesIterator getConfig() throws Exception {
String config = "<?xml version=\"1.0\" encoding=\"UTF-8\"
?><properties>\n" +
- " <fetchIterators>\n" +
- " <fetchIterator " +
- "
class=\"org.apache.tika.pipes.fetchiterator.jdbc.JDBCFetchIterator\">\n" +
+ " <pipesIterator " +
+ "
class=\"org.apache.tika.pipes.pipesiterator.jdbc.JDBCPipesIterator\">\n" +
" <params>\n" +
" <fetcherName>s3f</fetcherName>\n" +
" <emitterName>s3e</emitterName>\n" +
@@ -160,13 +157,12 @@ public class TestJDBCFetchIterator {
" <connection>jdbc:h2:file:" +
DB_DIR.toAbsolutePath() + "/" +
db + "</connection>\n" +
" </params>\n" +
- " </fetchIterator>\n" +
- " </fetchIterators>\n" +
+ " </pipesIterator>\n" +
"</properties>";
Path tmp = Files.createTempFile("tika-jdbc-", ".xml");
Files.write(tmp, config.getBytes(StandardCharsets.UTF_8));
- FetchIteratorManager manager =
- FetchIteratorManager.build(tmp);
+ PipesIterator manager =
+ PipesIterator.build(tmp);
Files.delete(tmp);
return manager;
}
@@ -183,7 +179,7 @@ public class TestJDBCFetchIterator {
public Integer call() throws Exception {
while (true) {
FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
- if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+ if (t == PipesIterator.COMPLETED_SEMAPHORE) {
return pairs.size();
}
pairs.add(t);
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j.properties
similarity index 100%
rename from
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
rename to
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j.properties
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/pom.xml
similarity index 97%
rename from tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
rename to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/pom.xml
index 8d18aca..b6f7473 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/pom.xml
@@ -28,7 +28,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>tika-fetch-iterator-s3</artifactId>
+ <artifactId>tika-pipes-iterator-s3</artifactId>
<name>Apache Tika Fetch Iterator - S3</name>
<url>https://tika.apache.org/</url>
@@ -97,7 +97,7 @@
<configuration>
<archive>
<manifestEntries>
-
<Automatic-Module-Name>org.apache.tika.pipes.fetchiterator.s3</Automatic-Module-Name>
+
<Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.s3</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
similarity index 95%
rename from
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
rename to
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
index 11a7dd0..69c089a 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetchiterator.s3;
+package org.apache.tika.pipes.pipesiterator.s3;
import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
@@ -43,11 +43,11 @@ import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.HandlerConfig;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
-public class S3FetchIterator extends FetchIterator implements Initializable {
+public class S3PipesIterator extends PipesIterator implements Initializable {
- private static final Logger LOGGER =
LoggerFactory.getLogger(S3FetchIterator.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(S3PipesIterator.class);
private String prefix = "";
private String region;
private String credentialsProvider;
@@ -109,7 +109,7 @@ public class S3FetchIterator extends FetchIterator
implements Initializable {
s3Client =
AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
.build();
} catch (AmazonClientException e) {
- throw new TikaConfigException("can't initialize s3 fetchiterator");
+ throw new TikaConfigException("can't initialize s3 pipesiterator");
}
}
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/java/org/apache/tika/pipes/pipesiterator/s3/TestS3PipesIterator.java
similarity index 90%
rename from
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
rename to
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/java/org/apache/tika/pipes/pipesiterator/s3/TestS3PipesIterator.java
index debed19..c647ea6 100644
---
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
+++
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/java/org/apache/tika/pipes/pipesiterator/s3/TestS3PipesIterator.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.tika.pipes.fetchiterator.s3;
+package org.apache.tika.pipes.pipesiterator.s3;
import static org.junit.Assert.assertEquals;
@@ -33,15 +33,15 @@ import org.junit.Ignore;
import org.junit.Test;
import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
@Ignore("turn into an actual unit test")
-public class TestS3FetchIterator {
+public class TestS3PipesIterator {
@Test
public void testSimple() throws Exception {
- S3FetchIterator it = new S3FetchIterator();
+ S3PipesIterator it = new S3PipesIterator();
it.setFetcherName("s3");
it.setBucket("");//find one
it.setProfile("");//use one
@@ -62,7 +62,7 @@ public class TestS3FetchIterator {
queue.offer(t);
}
for (int i = 0; i < numConsumers; i++) {
- queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+ queue.offer(PipesIterator.COMPLETED_SEMAPHORE);
}
int finished = 0;
int completed = 0;
@@ -90,7 +90,7 @@ public class TestS3FetchIterator {
public Integer call() throws Exception {
while (true) {
FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
- if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+ if (t == PipesIterator.COMPLETED_SEMAPHORE) {
return pairs.size();
}
pairs.add(t);
diff --git
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j.properties
similarity index 100%
rename from
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
rename to
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j.properties
diff --git
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
index 5329fae..9803b80 100644
---
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
+++
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
@@ -40,8 +40,7 @@ import org.xml.sax.SAXException;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchIteratorManager;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
public class TikaClientCLI {
@@ -66,12 +65,12 @@ public class TikaClientCLI {
ExecutorService executorService =
Executors.newFixedThreadPool(numThreads + 1);
ExecutorCompletionService<Integer> completionService =
new ExecutorCompletionService<>(executorService);
- final FetchIterator fetchIterator =
- FetchIteratorManager.build(tikaConfigPath).getFetchIterator();
+ final PipesIterator pipesIterator =
+ PipesIterator.build(tikaConfigPath);
final ArrayBlockingQueue<FetchEmitTuple> queue =
new ArrayBlockingQueue<>(QUEUE_SIZE);
- completionService.submit(new FetchIteratorWrapper(fetchIterator,
queue, numThreads));
+ completionService.submit(new PipesIteratorWrapper(pipesIterator,
queue, numThreads));
if (tikaServerUrls.size() == numThreads) {
logDiffSizes(tikaServerUrls.size(), numThreads);
for (int i = 0; i < numThreads; i++) {
@@ -133,7 +132,7 @@ public class TikaClientCLI {
send(localCache);
throw new TimeoutException("exceeded maxWaitMs");
}
- if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+ if (t == PipesIterator.COMPLETED_SEMAPHORE) {
send(localCache);
return 1;
}
@@ -169,7 +168,7 @@ public class TikaClientCLI {
if (t == null) {
throw new TimeoutException("exceeded maxWaitMs");
}
- if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+ if (t == PipesIterator.COMPLETED_SEMAPHORE) {
return 1;
}
try {
@@ -182,15 +181,15 @@ public class TikaClientCLI {
}
}
- private class FetchIteratorWrapper implements Callable<Integer> {
- private final FetchIterator fetchIterator;
+ private class PipesIteratorWrapper implements Callable<Integer> {
+ private final PipesIterator pipesIterator;
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final int numThreads;
- public FetchIteratorWrapper(FetchIterator fetchIterator,
+ public PipesIteratorWrapper(PipesIterator pipesIterator,
ArrayBlockingQueue<FetchEmitTuple> queue,
int numThreads) {
- this.fetchIterator = fetchIterator;
+ this.pipesIterator = pipesIterator;
this.queue = queue;
this.numThreads = numThreads;
@@ -198,12 +197,12 @@ public class TikaClientCLI {
@Override
public Integer call() throws Exception {
- for (FetchEmitTuple t : fetchIterator) {
+ for (FetchEmitTuple t : pipesIterator) {
//potentially blocks forever
queue.put(t);
}
for (int i = 0; i < numThreads; i ++) {
- queue.put(FetchIterator.COMPLETED_SEMAPHORE);
+ queue.put(PipesIterator.COMPLETED_SEMAPHORE);
}
return 1;
}
diff --git
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index 0990813..0b029d8 100644
---
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -19,14 +19,14 @@
-->
<properties>
<service-loader initializableProblemHandler="throw"/>
- <fetchIterators>
- <fetchIterator
class="org.apache.tika.pipes.fetchiterator.FileSystemFetchIterator">
+ <pipesIterators>
+ <pipesIterator
class="org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator">
<params>
<fetcherName>fs</fetcherName>
<basePath>fix</basePath>
</params>
- </fetchIterator>
- </fetchIterators>
+ </pipesIterator>
+ </pipesIterators>
<fetchers>
<fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
<params>