This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 0fa9adc  TIKA-3391 refactor fetchiterators to pipes iterators
0fa9adc is described below

commit 0fa9adc6e88eb56dc9401b512790e685c660106a
Author: tallison <[email protected]>
AuthorDate: Tue May 11 10:44:46 2021 -0400

    TIKA-3391 refactor fetchiterators to pipes iterators
---
 .../java/org/apache/tika/config/ConfigBase.java    | 80 ++++++++++++++++++----
 .../apache/tika/pipes/async/AsyncProcessor.java    |  4 +-
 .../pipes/fetchiterator/FetchIteratorManager.java  | 69 -------------------
 .../FileSystemPipesIterator.java}                  |  8 +--
 .../PipesIterator.java}                            | 21 ++++--
 .../apache/tika/config/TikaPipesConfigTest.java    | 18 ++---
 .../tika/pipes/async/AsyncProcessorTest.java       |  4 +-
 .../FileSystemPipesIteratorTest.java}              |  6 +-
 ...erator-config.xml => pipes-iterator-config.xml} | 14 ++--
 ...nfig.xml => pipes-iterator-multiple-config.xml} | 26 ++++---
 tika-pipes/pom.xml                                 |  2 +-
 .../apache/tika/pipes/PipeIntegrationTests.java    | 24 +++----
 .../src/test/resources/tika-config-s3ToFs.xml      |  8 +--
 .../src/test/resources/tika-config-s3Tos3.xml      |  8 +--
 .../pom.xml                                        |  8 +--
 .../tika-pipes-iterator-csv}/pom.xml               |  4 +-
 .../pipes/pipesiterator/csv/CSVPipesIterator.java} |  8 +--
 .../src/test/java/TestCSVPipesIterator.java}       | 12 ++--
 .../src/test/resources/test-simple.csv             |  0
 .../tika-pipes-iterator-jdbc}/pom.xml              |  4 +-
 .../pipesiterator/jdbc/JDBCPipesIterator.java}     |  8 +--
 .../pipesiterator/jdbc/TestJDBCPipesIterator.java} | 32 ++++-----
 .../src/test/resources/log4j.properties            |  0
 .../tika-pipes-iterator-s3}/pom.xml                |  4 +-
 .../pipes/pipesiterator/s3/S3PipesIterator.java}   | 10 +--
 .../pipesiterator/s3/TestS3PipesIterator.java}     | 12 ++--
 .../src/test/resources/log4j.properties            |  0
 .../apache/tika/server/client/TikaClientCLI.java   | 25 ++++---
 .../resources/tika-config-simple-fs-emitter.xml    |  8 +--
 29 files changed, 207 insertions(+), 220 deletions(-)

diff --git a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java 
b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
index f8b6791..e332807 100644
--- a/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
+++ b/tika-core/src/main/java/org/apache/tika/config/ConfigBase.java
@@ -43,6 +43,52 @@ import org.apache.tika.utils.XMLReaderUtils;
 public abstract class ConfigBase {
 
     /**
+     * Use this to build a single class, where the user specifies the instance 
class, e.g.
+     * PipesIterator
+     *
+     * @param itemName
+     * @param is
+     * @throws TikaConfigException
+     * @throws IOException
+     */
+    protected static <T> T buildSingle(String itemName, Class<T> itemClass, 
InputStream is)
+            throws TikaConfigException, IOException {
+        Node properties = null;
+        try {
+            properties = XMLReaderUtils.buildDOM(is).getDocumentElement();
+        } catch (SAXException e) {
+            throw new IOException(e);
+        } catch (TikaException e) {
+            throw new TikaConfigException("problem loading xml to dom", e);
+        }
+        if (!properties.getLocalName().equals("properties")) {
+            throw new TikaConfigException("expect properties as root node");
+        }
+        NodeList children = properties.getChildNodes();
+        T toReturn = null;
+        for (int i = 0; i < children.getLength(); i++) {
+            Node child = children.item(i);
+            if (child.getNodeType() != 1) {
+                continue;
+            }
+            if (itemName.equals(child.getLocalName())) {
+                if (toReturn != null) {
+                    throw new TikaConfigException("There can only be one " + 
itemName +
+                            " in a config");
+                }
+                T item = buildClass(child, itemName, itemClass);
+                setParams(item, child, new HashSet<>());
+                toReturn = (T)item;
+            }
+        }
+        if (toReturn == null) {
+            throw new TikaConfigException("could not find " + itemName);
+        }
+        return toReturn;
+    }
+
+
+    /**
      * Use this to build a list of components for a composite item (e.g.
      * CompositeMetadataFilter, FetcherManager), each with their own 
configurations
      *
@@ -224,9 +270,27 @@ public abstract class ConfigBase {
             Node n = nodeList.item(i);
             if (n.getNodeType() == 1) {
                 NamedNodeMap m = n.getAttributes();
-                String from = m.getNamedItem("from").getTextContent();
-                String to = m.getNamedItem("to").getTextContent();
-                map.put(from, to);
+                String key = null;
+                String value = null;
+                if (m.getNamedItem("from") != null) {
+                    key = m.getNamedItem("from").getTextContent();
+                } else if (m.getNamedItem("key") != null) {
+                    key = m.getNamedItem("key").getTextContent();
+                }
+                if (m.getNamedItem("to") != null) {
+                    value = m.getNamedItem("to").getTextContent();
+                } else if (m.getNamedItem("value") != null) {
+                    value = m.getNamedItem("value").getTextContent();
+                }
+                if (key == null) {
+                    throw new TikaConfigException("must specify a 'key' or 
'from' value in a map " +
+                            "object : " + param);
+                }
+                if (value == null) {
+                    throw new TikaConfigException("must specify a 'value' or 
'to' value in a " +
+                            "map object : " + param);
+                }
+                map.put(key, value);
             }
 
         }
@@ -315,16 +379,6 @@ public abstract class ConfigBase {
             "Couldn't find setter: " + setter + " for object " + 
object.getClass());
     }
 
-    private static List<String> loadStringList(String itemName, NodeList 
nodelist) {
-        List<String> list = new ArrayList<>();
-        for (int i = 0; i < nodelist.getLength(); i++) {
-            Node n = nodelist.item(i);
-            if (itemName.equals(n.getLocalName())) {
-                list.add(n.getTextContent());
-            }
-        }
-        return list;
-    }
 
     /**
      * This should be overridden to do something with the settings
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java 
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
index 0d66074..5453581 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java
@@ -38,7 +38,7 @@ import org.apache.tika.pipes.PipesException;
 import org.apache.tika.pipes.PipesResult;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitterManager;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 
 /**
  * This is the main class for handling async requests. This manages
@@ -174,7 +174,7 @@ public class AsyncProcessor implements Closeable {
                     FetchEmitTuple t = fetchEmitTuples.poll(1, 
TimeUnit.SECONDS);
                     if (t == null) {
                         //skip
-                    } else if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                    } else if (t == PipesIterator.COMPLETED_SEMAPHORE) {
                         return PARSER_FUTURE_CODE;
                     } else {
                         PipesResult result = null;
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIteratorManager.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIteratorManager.java
deleted file mode 100644
index 41f7460..0000000
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIteratorManager.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.fetchiterator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.tika.config.ConfigBase;
-import org.apache.tika.config.Initializable;
-import org.apache.tika.config.InitializableProblemHandler;
-import org.apache.tika.config.Param;
-import org.apache.tika.exception.TikaConfigException;
-
-/**
- * This class currently only supports one fetch iterator at a time.
- */
-public class FetchIteratorManager extends ConfigBase implements Initializable {
-
-    public static FetchIteratorManager build(Path tikaConfigFile) throws 
IOException,
-            TikaConfigException {
-        try (InputStream is = Files.newInputStream(tikaConfigFile)) {
-            return buildComposite("fetchIterators", 
FetchIteratorManager.class, "fetchIterator",
-                    FetchIterator.class, is);
-        }
-    }
-
-    private List<FetchIterator> fetchIterators = new ArrayList<>();
-    public FetchIteratorManager(List<FetchIterator> fetchIterators) {
-        this.fetchIterators = fetchIterators;
-    }
-
-    public FetchIterator getFetchIterator() {
-        return fetchIterators.get(0);
-    }
-
-    @Override
-    public void initialize(Map<String, Param> params) throws 
TikaConfigException {
-
-    }
-
-    @Override
-    public void checkInitialization(InitializableProblemHandler 
problemHandler) throws TikaConfigException {
-        if (fetchIterators.size() == 0) {
-            throw new TikaConfigException("must be at least one fetch 
iterator");
-        }
-        if (fetchIterators.size() > 1) {
-            throw new TikaConfigException("Sorry, we currently only support a 
single fetch iterator");
-        }
-    }
-}
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIterator.java
similarity index 95%
rename from 
tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
rename to 
tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIterator.java
index aef095d..ce0f208 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIterator.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIterator.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetchiterator;
+package org.apache.tika.pipes.pipesiterator;
 
 import java.io.IOException;
 import java.nio.file.FileVisitResult;
@@ -35,14 +35,14 @@ import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
 
-public class FileSystemFetchIterator extends FetchIterator implements 
Initializable {
+public class FileSystemPipesIterator extends PipesIterator implements 
Initializable {
 
     private Path basePath;
 
-    public FileSystemFetchIterator() {
+    public FileSystemPipesIterator() {
     }
 
-    public FileSystemFetchIterator(Path basePath) {
+    public FileSystemPipesIterator(Path basePath) {
         this.basePath = basePath;
     }
 
diff --git 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
 
b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
similarity index 93%
rename from 
tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
rename to 
tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
index 490a77b..dfb5bf4 100644
--- 
a/tika-core/src/main/java/org/apache/tika/pipes/fetchiterator/FetchIterator.java
+++ 
b/tika-core/src/main/java/org/apache/tika/pipes/pipesiterator/PipesIterator.java
@@ -14,9 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetchiterator;
+package org.apache.tika.pipes.pipesiterator;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -29,6 +32,7 @@ import java.util.concurrent.TimeoutException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.tika.config.ConfigBase;
 import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
@@ -46,8 +50,8 @@ import org.apache.tika.sax.BasicContentHandlerFactory;
  * a RuntimeException.  It will throw an IllegalStateException if
  * next() is called after hasNext() has returned false.
  */
-public abstract class FetchIterator
-        implements Callable<Integer>, Iterable<FetchEmitTuple>, Initializable {
+public abstract class PipesIterator extends ConfigBase
+        implements Callable<Integer>, Iterable<FetchEmitTuple>, Initializable  
{
 
     public static final long DEFAULT_MAX_WAIT_MS = 300_000;
     public static final int DEFAULT_QUEUE_SIZE = 1000;
@@ -55,7 +59,7 @@ public abstract class FetchIterator
     public static final FetchEmitTuple COMPLETED_SEMAPHORE =
             new FetchEmitTuple(null,null, null, null, null, null);
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FetchIterator.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PipesIterator.class);
 
     private long maxWaitMs = DEFAULT_MAX_WAIT_MS;
     private ArrayBlockingQueue<FetchEmitTuple> queue = null;
@@ -72,6 +76,15 @@ public abstract class FetchIterator
     private int added = 0;
     private FutureTask<Integer> futureTask;
 
+    public static PipesIterator build(Path tikaConfigFile) throws IOException,
+            TikaConfigException {
+        try (InputStream is = Files.newInputStream(tikaConfigFile)) {
+            return buildSingle(
+                    "pipesIterator",
+                    PipesIterator.class, is);
+        }
+    }
+
     public String getFetcherName() {
         return fetcherName;
     }
diff --git 
a/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java 
b/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
index 80513da..7777278 100644
--- a/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
+++ b/tika-core/src/test/java/org/apache/tika/config/TikaPipesConfigTest.java
@@ -29,7 +29,7 @@ import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
 import org.apache.tika.pipes.fetcher.FileSystemFetcher;
-import org.apache.tika.pipes.fetchiterator.FetchIteratorManager;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 
 public class TikaPipesConfigTest extends AbstractTikaConfigTest {
     //this handles tests for the newer pipes type configs.
@@ -78,17 +78,17 @@ public class TikaPipesConfigTest extends 
AbstractTikaConfigTest {
     }
 
     @Test
-    public void testFetchIterator() throws Exception {
-        FetchIteratorManager fim =
-                
FetchIteratorManager.build(getConfigFilePath("fetch-iterator-config.xml"));
-        assertEquals("fs1", fim.getFetchIterator().getFetcherName());
+    public void testPipesIterator() throws Exception {
+        PipesIterator it =
+                
PipesIterator.build(getConfigFilePath("pipes-iterator-config.xml"));
+        assertEquals("fs1", it.getFetcherName());
     }
 
     @Test(expected = TikaConfigException.class)
-    public void testMultipleFetchIterators() throws Exception {
-        FetchIteratorManager fim =
-                
FetchIteratorManager.build(getConfigFilePath("fetch-iterator-multiple-config.xml"));
-        assertEquals("fs1", fim.getFetchIterator().getFetcherName());
+    public void testMultiplePipesIterators() throws Exception {
+        PipesIterator it =
+                
PipesIterator.build(getConfigFilePath("pipes-iterator-multiple-config.xml"));
+        assertEquals("fs1", it.getFetcherName());
     }
 
 }
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java 
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
index 6a82c6f..3040d3a 100644
--- 
a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
+++ 
b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java
@@ -37,7 +37,7 @@ import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.emitter.EmitData;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 import org.apache.tika.utils.ProcessUtils;
 
 public class AsyncProcessorTest {
@@ -101,7 +101,7 @@ public class AsyncProcessorTest {
             processor.offer(t, 1000);
         }
         for (int i = 0; i < 10; i++) {
-            processor.offer(FetchIterator.COMPLETED_SEMAPHORE, 1000);
+            processor.offer(PipesIterator.COMPLETED_SEMAPHORE, 1000);
         }
         //TODO clean this up
         while (processor.checkActive()) {
diff --git 
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
 
b/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
similarity index 93%
rename from 
tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
rename to 
tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
index c87e3d0..7efeb8a 100644
--- 
a/tika-core/src/test/java/org/apache/tika/pipes/fetchiterator/FileSystemFetchIteratorTest.java
+++ 
b/tika-core/src/test/java/org/apache/tika/pipes/pipesiterator/FileSystemPipesIteratorTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetchiterator;
+package org.apache.tika.pipes.pipesiterator;
 
 import static org.junit.Assert.assertEquals;
 
@@ -33,7 +33,7 @@ import org.junit.Test;
 import org.apache.tika.pipes.FetchEmitTuple;
 
 
-public class FileSystemFetchIteratorTest {
+public class FileSystemPipesIteratorTest {
 
     public static List<Path> listFiles(Path path) throws IOException {
 
@@ -57,7 +57,7 @@ public class FileSystemFetchIteratorTest {
         }
 
         String fetcherName = "fs";
-        FetchIterator it = new FileSystemFetchIterator(root);
+        PipesIterator it = new FileSystemPipesIterator(root);
         it.setFetcherName(fetcherName);
         it.setQueueSize(2);
 
diff --git 
a/tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-config.xml 
b/tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml
similarity index 76%
rename from 
tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-config.xml
rename to 
tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml
index 48d24fe..3f3fa51 100644
--- 
a/tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-config.xml
+++ 
b/tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml
@@ -16,12 +16,10 @@
   limitations under the License.
 -->
 <properties>
-  <fetchIterators>
-    <fetchIterator 
class="org.apache.tika.pipes.fetchiterator.FileSystemFetchIterator">
-      <params>
-        <fetcherName>fs1</fetcherName>
-        <basePath>/my/base/path1</basePath>
-      </params>
-    </fetchIterator>
-  </fetchIterators>
+  <pipesIterator 
class="org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator">
+    <params>
+      <fetcherName>fs1</fetcherName>
+      <basePath>/my/base/path1</basePath>
+    </params>
+  </pipesIterator>
 </properties>
diff --git 
a/tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-multiple-config.xml
 
b/tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml
similarity index 63%
rename from 
tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-multiple-config.xml
rename to 
tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml
index 3e4352d..c43a518 100644
--- 
a/tika-core/src/test/resources/org/apache/tika/config/fetch-iterator-multiple-config.xml
+++ 
b/tika-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml
@@ -16,18 +16,16 @@
   limitations under the License.
 -->
 <properties>
-  <fetchIterators>
-    <fetchIterator 
class="org.apache.tika.pipes.fetchiterator.FileSystemFetchIterator">
-      <params>
-        <fetcherName>fs1</fetcherName>
-        <basePath>/my/base/path1</basePath>
-      </params>
-    </fetchIterator>
-    <fetchIterator 
class="org.apache.tika.pipes.fetchiterator.FileSystemFetchIterator">
-      <params>
-        <fetcherName>fs2</fetcherName>
-        <basePath>/my/base/path2</basePath>
-      </params>
-    </fetchIterator>
-  </fetchIterators>
+  <pipesIterator 
class="org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator">
+    <params>
+      <fetcherName>fs1</fetcherName>
+      <basePath>/my/base/path1</basePath>
+    </params>
+  </pipesIterator>
+  <pipesIterator 
class="org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator">
+    <params>
+      <fetcherName>fs2</fetcherName>
+      <basePath>/my/base/path2</basePath>
+    </params>
+  </pipesIterator>
 </properties>
diff --git a/tika-pipes/pom.xml b/tika-pipes/pom.xml
index 8614090..8f79838 100644
--- a/tika-pipes/pom.xml
+++ b/tika-pipes/pom.xml
@@ -35,7 +35,7 @@
     <module>tika-httpclient-commons</module>
     <module>tika-fetchers</module>
     <module>tika-emitters</module>
-    <module>tika-fetch-iterators</module>
+    <module>tika-pipes-iterators</module>
     <module>tika-pipes-integration-tests</module>
   </modules>
 
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
index 6b74aea..7cad1da 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/java/org/apache/tika/pipes/PipeIntegrationTests.java
@@ -48,8 +48,7 @@ import org.apache.tika.pipes.emitter.EmitterManager;
 import org.apache.tika.pipes.emitter.s3.S3Emitter;
 import org.apache.tika.pipes.fetcher.Fetcher;
 import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchIteratorManager;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 
 @Ignore("turn these into actual tests")
 public class PipeIntegrationTests {
@@ -89,7 +88,7 @@ public class PipeIntegrationTests {
     @Test
     public void testS3ToFS() throws Exception {
         Fetcher fetcher = getFetcher("tika-config-s3ToFs.xml", "s3f");
-        FetchIterator fetchIterator = 
getFetchIterator("tika-config-s3ToFs.xml");
+        PipesIterator pipesIterator = 
getPipesIterator("tika-config-s3ToFs.xml");
 
         int numConsumers = 1;
         ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
@@ -98,11 +97,11 @@ public class PipeIntegrationTests {
         for (int i = 0; i < numConsumers; i++) {
             completionService.submit(new FSFetcherEmitter(queue, fetcher, 
null));
         }
-        for (FetchEmitTuple t : fetchIterator) {
+        for (FetchEmitTuple t : pipesIterator) {
             queue.offer(t);
         }
         for (int i = 0; i < numConsumers; i++) {
-            queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+            queue.offer(PipesIterator.COMPLETED_SEMAPHORE);
         }
         int finished = 0;
         try {
@@ -119,7 +118,7 @@ public class PipeIntegrationTests {
     public void testS3ToS3() throws Exception {
         Fetcher fetcher = getFetcher("tika-config-s3Tos3.xml", "s3f");
         Emitter emitter = getEmitter("tika-config-s3Tos3.xml", "s3e");
-        FetchIterator fetchIterator = 
getFetchIterator("tika-config-s3Tos3.xml");
+        PipesIterator pipesIterator = 
getPipesIterator("tika-config-s3Tos3.xml");
         int numConsumers = 20;
         ExecutorService es = Executors.newFixedThreadPool(numConsumers + 1);
         ExecutorCompletionService<Integer> completionService = new 
ExecutorCompletionService<>(es);
@@ -127,11 +126,11 @@ public class PipeIntegrationTests {
         for (int i = 0; i < numConsumers; i++) {
             completionService.submit(new S3FetcherEmitter(queue, fetcher, 
(S3Emitter) emitter));
         }
-        for (FetchEmitTuple t : fetchIterator) {
+        for (FetchEmitTuple t : pipesIterator) {
             queue.offer(t);
         }
         for (int i = 0; i < numConsumers; i++) {
-            queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+            queue.offer(PipesIterator.COMPLETED_SEMAPHORE);
         }
         int finished = 0;
         try {
@@ -154,9 +153,8 @@ public class PipeIntegrationTests {
         return manager.getEmitter(emitterName);
     }
 
-    private FetchIterator getFetchIterator(String fileName) throws Exception {
-        FetchIteratorManager fim = 
FetchIteratorManager.build(getPath(fileName));
-        return fim.getFetchIterator();
+    private PipesIterator getPipesIterator(String fileName) throws Exception {
+        return PipesIterator.build(getPath(fileName));
     }
 
     private Path getPath(String fileName) throws Exception {
@@ -186,7 +184,7 @@ public class PipeIntegrationTests {
                 if (t == null) {
                     throw new TimeoutException("");
                 }
-                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
                     return 1;
                 }
                 process(t);
@@ -228,7 +226,7 @@ public class PipeIntegrationTests {
                 if (t == null) {
                     throw new TimeoutException("");
                 }
-                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
                     return 1;
                 }
                 process(t);
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3ToFs.xml
 
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3ToFs.xml
index 01387cb..c0be998 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3ToFs.xml
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3ToFs.xml
@@ -27,14 +27,14 @@
             </params>
         </fetcher>
     </fetchers>
-    <fetchIterators>
-        <fetchIterator 
class="org.apache.tika.pipes.fetchiterator.s3.S3FetchIterator">
+    <pipesIterators>
+        <pipesIterator 
class="org.apache.tika.pipes.pipesiterator.s3.S3PipesIterator">
             <params>
                 <fetcherName>s3</fetcherName>
                 <bucket><!-- fill in here --></bucket>
                 <region>us-east-1</region>
                 <profile><!-- fill in here --></profile>
             </params>
-        </fetchIterator>
-    </fetchIterators>
+        </pipesIterator>
+    </pipesIterators>
 </properties>
\ No newline at end of file
diff --git 
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
 
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
index 57c0a12..3302b96 100644
--- 
a/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
+++ 
b/tika-pipes/tika-pipes-integration-tests/src/test/resources/tika-config-s3Tos3.xml
@@ -28,16 +28,16 @@
             </params>
         </fetcher>
     </fetchers>
-    <fetchIterators>
-        <fetchIterator 
class="org.apache.tika.pipes.fetchiterator.s3.S3FetchIterator">
+    <pipesIterators>
+        <pipesIterator 
class="org.apache.tika.pipes.pipesiterator.s3.S3PipesIterator">
             <params>
                 <fetcherName>s3f</fetcherName>
                 <region>us-east-1</region>
                 <bucket><!-- fill in here --></bucket>
                 <profile><!-- fill in here --></profile>
             </params>
-        </fetchIterator>
-    </fetchIterators>
+        </pipesIterator>
+    </pipesIterators>
     <emitters>
         <emitter class="org.apache.tika.pipes.emitter.s3.S3Emitter">
             <params>
diff --git a/tika-pipes/tika-fetch-iterators/pom.xml 
b/tika-pipes/tika-pipes-iterators/pom.xml
similarity index 88%
rename from tika-pipes/tika-fetch-iterators/pom.xml
rename to tika-pipes/tika-pipes-iterators/pom.xml
index 49451a4..cad1a08 100644
--- a/tika-pipes/tika-fetch-iterators/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/pom.xml
@@ -33,11 +33,11 @@
   <name>Apache Tika fetch iterators</name>
   <url>https://tika.apache.org/</url>
 
-  <!-- see also org.apache.tika.pipes.fetchiterator.FileSystemIterator
+  <!-- see also org.apache.tika.pipes.pipesiterator.FileSystemIterator
        in tika-core if you want a file system directory crawler -->
   <modules>
-    <module>tika-fetch-iterator-csv</module>
-    <module>tika-fetch-iterator-jdbc</module>
-    <module>tika-fetch-iterator-s3</module>
+    <module>tika-pipes-iterator-csv</module>
+    <module>tika-pipes-iterator-jdbc</module>
+    <module>tika-pipes-iterator-s3</module>
   </modules>
 </project>
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/pom.xml 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/pom.xml
similarity index 97%
rename from tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/pom.xml
rename to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/pom.xml
index f6197a4..a50acb5 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/pom.xml
@@ -28,7 +28,7 @@
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
-  <artifactId>tika-fetch-iterator-csv</artifactId>
+  <artifactId>tika-pipes-iterator-csv</artifactId>
 
   <name>Apache Tika Fetch Iterator - CSV</name>
   <url>https://tika.apache.org/</url>
@@ -69,7 +69,7 @@
         <configuration>
           <archive>
             <manifestEntries>
-              
<Automatic-Module-Name>org.apache.tika.pipes.fetchiterator.csv</Automatic-Module-Name>
+              
<Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.csv</Automatic-Module-Name>
             </manifestEntries>
           </archive>
         </configuration>
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
similarity index 98%
rename from 
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
rename to 
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
index 7799b05..17ea954 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/main/java/org/apache/tika/pipes/fetchiterator/csv/CSVFetchIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetchiterator.csv;
+package org.apache.tika.pipes.pipesiterator.csv;
 
 import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
@@ -44,7 +44,7 @@ import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 import org.apache.tika.utils.StringUtils;
 
 /**
@@ -75,9 +75,9 @@ import org.apache.tika.utils.StringUtils;
  *      <li>The 'emitKeyColumn' value is not added to the metadata.</li>
  *  </ul>
  */
-public class CSVFetchIterator extends FetchIterator implements Initializable {
+public class CSVPipesIterator extends PipesIterator implements Initializable {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(CSVFetchIterator.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CSVPipesIterator.class);
 
     private final Charset charset = StandardCharsets.UTF_8;
     private Path csvPath;
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
similarity index 92%
rename from 
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
rename to 
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
index 31ff496..1293ea6 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/java/TestCSVFetchIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-import static 
org.apache.tika.pipes.fetchiterator.FetchIterator.COMPLETED_SEMAPHORE;
+import static 
org.apache.tika.pipes.pipesiterator.PipesIterator.COMPLETED_SEMAPHORE;
 import static org.junit.Assert.assertEquals;
 
 import java.nio.file.Path;
@@ -33,15 +33,15 @@ import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 
 import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.csv.CSVFetchIterator;
+import org.apache.tika.pipes.pipesiterator.csv.CSVPipesIterator;
 
-public class TestCSVFetchIterator {
+public class TestCSVPipesIterator {
 
 
     @Test
     public void testSimple() throws Exception {
         Path p = get("test-simple.csv");
-        CSVFetchIterator it = new CSVFetchIterator();
+        CSVPipesIterator it = new CSVPipesIterator();
         it.setFetcherName("fsf");
         it.setEmitterName("fse");
         it.setCsvPath(p);
@@ -86,7 +86,7 @@ public class TestCSVFetchIterator {
     @Test(expected = RuntimeException.class)
     public void testBadFetchKeyCol() throws Exception {
         Path p = get("test-simple.csv");
-        CSVFetchIterator it = new CSVFetchIterator();
+        CSVPipesIterator it = new CSVPipesIterator();
         it.setFetcherName("fs");
         it.setCsvPath(p);
         it.setFetchKeyColumn("fetchKeyDoesntExist");
@@ -96,7 +96,7 @@ public class TestCSVFetchIterator {
     }
 
     private Path get(String testFileName) throws Exception {
-        return Paths.get(TestCSVFetchIterator.class.getResource("/" + 
testFileName).toURI());
+        return Paths.get(TestCSVPipesIterator.class.getResource("/" + 
testFileName).toURI());
     }
 
     private static class MockFetcher implements Callable<Integer> {
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/resources/test-simple.csv
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/resources/test-simple.csv
similarity index 100%
rename from 
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-csv/src/test/resources/test-simple.csv
rename to 
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/resources/test-simple.csv
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/pom.xml
similarity index 97%
rename from tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
rename to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/pom.xml
index 2cdbcd9..50aa604 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/pom.xml
@@ -28,7 +28,7 @@
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
-  <artifactId>tika-fetch-iterator-jdbc</artifactId>
+  <artifactId>tika-pipes-iterator-jdbc</artifactId>
 
   <name>Apache Tika Fetch Iterator - jdbc</name>
   <url>https://tika.apache.org/</url>
@@ -60,7 +60,7 @@
         <configuration>
           <archive>
             <manifestEntries>
-              
<Automatic-Module-Name>org.apache.tika.pipes.fetchiterator.jdbc</Automatic-Module-Name>
+              
<Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.jdbc</Automatic-Module-Name>
             </manifestEntries>
           </archive>
         </configuration>
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
similarity index 98%
rename from 
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
rename to 
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
index ac8f2a2..024b99b 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/main/java/org/apache/tika/pipes/fetchiterator/jdbc/JDBCFetchIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetchiterator.jdbc;
+package org.apache.tika.pipes.pipesiterator.jdbc;
 
 import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
@@ -43,7 +43,7 @@ import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 import org.apache.tika.utils.StringUtils;
 
 /**
@@ -64,10 +64,10 @@ import org.apache.tika.utils.StringUtils;
  *      <li>The 'emitKeyColumn' value is not added to the metadata.</li>
  *  </ul>
  */
-public class JDBCFetchIterator extends FetchIterator implements Initializable {
+public class JDBCPipesIterator extends PipesIterator implements Initializable {
 
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(JDBCFetchIterator.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JDBCPipesIterator.class);
 
     private String idColumn;
     private String fetchKeyColumn;
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/java/org/apache/tika/pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java
similarity index 86%
rename from 
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
rename to 
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/java/org/apache/tika/pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java
index 9de5010..177950d 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/java/org/apache/tika/pipes/fetchiterator/jdbc/TestJDBCFetchIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/java/org/apache/tika/pipes/pipesiterator/jdbc/TestJDBCPipesIterator.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetchiterator.jdbc;
+package org.apache.tika.pipes.pipesiterator.jdbc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -44,10 +44,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchIteratorManager;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 
-public class TestJDBCFetchIterator {
+public class TestJDBCPipesIterator {
 
     static final String TABLE = "fetchkeys";
     static final String db = "mydb";
@@ -57,7 +56,7 @@ public class TestJDBCFetchIterator {
 
     @BeforeClass
     public static void setUp() throws Exception {
-        DB_DIR = Files.createTempDirectory("tika-jdbc-fetchiterator-test-");
+        DB_DIR = Files.createTempDirectory("tika-jdbc-pipesiterator-test-");
 
         CONNECTION =
                 DriverManager.getConnection("jdbc:h2:file:" +
@@ -88,10 +87,9 @@ public class TestJDBCFetchIterator {
 
     @Test
     public void testSimple() throws Exception {
-        FetchIteratorManager fetchIteratorManager = getConfig();
         int numConsumers = 5;
 
-        FetchIterator fetchIterator = fetchIteratorManager.getFetchIterator();
+        PipesIterator pipesIterator = getConfig();
         ExecutorService es = Executors.newFixedThreadPool(numConsumers);
         ExecutorCompletionService<Integer> completionService =
                 new ExecutorCompletionService<>(es);
@@ -103,13 +101,13 @@ public class TestJDBCFetchIterator {
             completionService.submit(mockFetcher);
         }
         int offered = 0;
-        for (FetchEmitTuple t : fetchIterator) {
+        for (FetchEmitTuple t : pipesIterator) {
             queue.put(t);
             offered++;
         }
         assertEquals(NUM_ROWS, offered);
         for (int i = 0; i < numConsumers; i++) {
-            queue.put(FetchIterator.COMPLETED_SEMAPHORE);
+            queue.put(PipesIterator.COMPLETED_SEMAPHORE);
         }
         int processed = 0;
         int completed = 0;
@@ -142,11 +140,10 @@ public class TestJDBCFetchIterator {
         assertEquals(NUM_ROWS, cnt);
     }
 
-    private FetchIteratorManager getConfig() throws Exception {
+    private PipesIterator getConfig() throws Exception {
         String config = "<?xml version=\"1.0\" encoding=\"UTF-8\" 
?><properties>\n" +
-                "    <fetchIterators>\n" +
-                "        <fetchIterator " +
-                "       
class=\"org.apache.tika.pipes.fetchiterator.jdbc.JDBCFetchIterator\">\n" +
+                "        <pipesIterator " +
+                "       
class=\"org.apache.tika.pipes.pipesiterator.jdbc.JDBCPipesIterator\">\n" +
                 "            <params>\n" +
                 "                <fetcherName>s3f</fetcherName>\n" +
                 "                <emitterName>s3e</emitterName>\n" +
@@ -160,13 +157,12 @@ public class TestJDBCFetchIterator {
                 "                <connection>jdbc:h2:file:" + 
DB_DIR.toAbsolutePath() + "/" +
                     db + "</connection>\n" +
                 "            </params>\n" +
-                "        </fetchIterator>\n" +
-                "    </fetchIterators>\n" +
+                "        </pipesIterator>\n" +
                 "</properties>";
         Path tmp = Files.createTempFile("tika-jdbc-", ".xml");
         Files.write(tmp, config.getBytes(StandardCharsets.UTF_8));
-        FetchIteratorManager manager =
-                FetchIteratorManager.build(tmp);
+        PipesIterator manager =
+                PipesIterator.build(tmp);
         Files.delete(tmp);
         return manager;
     }
@@ -183,7 +179,7 @@ public class TestJDBCFetchIterator {
         public Integer call() throws Exception {
             while (true) {
                 FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
-                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
                     return pairs.size();
                 }
                 pairs.add(t);
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j.properties
similarity index 100%
rename from 
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/resources/log4j.properties
rename to 
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/test/resources/log4j.properties
diff --git a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/pom.xml
similarity index 97%
rename from tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
rename to tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/pom.xml
index 8d18aca..b6f7473 100644
--- a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/pom.xml
+++ b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/pom.xml
@@ -28,7 +28,7 @@
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
-  <artifactId>tika-fetch-iterator-s3</artifactId>
+  <artifactId>tika-pipes-iterator-s3</artifactId>
 
   <name>Apache Tika Fetch Iterator - S3</name>
   <url>https://tika.apache.org/</url>
@@ -97,7 +97,7 @@
         <configuration>
           <archive>
             <manifestEntries>
-              
<Automatic-Module-Name>org.apache.tika.pipes.fetchiterator.s3</Automatic-Module-Name>
+              
<Automatic-Module-Name>org.apache.tika.pipes.pipesiterator.s3</Automatic-Module-Name>
             </manifestEntries>
           </archive>
         </configuration>
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
similarity index 95%
rename from 
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
rename to 
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
index 11a7dd0..69c089a 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/main/java/org/apache/tika/pipes/fetchiterator/s3/S3FetchIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetchiterator.s3;
+package org.apache.tika.pipes.pipesiterator.s3;
 
 import static org.apache.tika.config.TikaConfig.mustNotBeEmpty;
 
@@ -43,11 +43,11 @@ import org.apache.tika.pipes.FetchEmitTuple;
 import org.apache.tika.pipes.HandlerConfig;
 import org.apache.tika.pipes.emitter.EmitKey;
 import org.apache.tika.pipes.fetcher.FetchKey;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 
-public class S3FetchIterator extends FetchIterator implements Initializable {
+public class S3PipesIterator extends PipesIterator implements Initializable {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(S3FetchIterator.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(S3PipesIterator.class);
     private String prefix = "";
     private String region;
     private String credentialsProvider;
@@ -109,7 +109,7 @@ public class S3FetchIterator extends FetchIterator 
implements Initializable {
             s3Client = 
AmazonS3ClientBuilder.standard().withRegion(region).withCredentials(provider)
                     .build();
         } catch (AmazonClientException e) {
-            throw new TikaConfigException("can't initialize s3 fetchiterator");
+            throw new TikaConfigException("can't initialize s3 pipesiterator");
         }
     }
 
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/java/org/apache/tika/pipes/pipesiterator/s3/TestS3PipesIterator.java
similarity index 90%
rename from 
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
rename to 
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/java/org/apache/tika/pipes/pipesiterator/s3/TestS3PipesIterator.java
index debed19..c647ea6 100644
--- 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-s3/src/test/java/org/apache/tika/pipes/fetchiterator/s3/TestS3FetchIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/java/org/apache/tika/pipes/pipesiterator/s3/TestS3PipesIterator.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.fetchiterator.s3;
+package org.apache.tika.pipes.pipesiterator.s3;
 
 import static org.junit.Assert.assertEquals;
 
@@ -33,15 +33,15 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 
 @Ignore("turn into an actual unit test")
-public class TestS3FetchIterator {
+public class TestS3PipesIterator {
 
 
     @Test
     public void testSimple() throws Exception {
-        S3FetchIterator it = new S3FetchIterator();
+        S3PipesIterator it = new S3PipesIterator();
         it.setFetcherName("s3");
         it.setBucket("");//find one
         it.setProfile("");//use one
@@ -62,7 +62,7 @@ public class TestS3FetchIterator {
             queue.offer(t);
         }
         for (int i = 0; i < numConsumers; i++) {
-            queue.offer(FetchIterator.COMPLETED_SEMAPHORE);
+            queue.offer(PipesIterator.COMPLETED_SEMAPHORE);
         }
         int finished = 0;
         int completed = 0;
@@ -90,7 +90,7 @@ public class TestS3FetchIterator {
         public Integer call() throws Exception {
             while (true) {
                 FetchEmitTuple t = queue.poll(1, TimeUnit.HOURS);
-                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
                     return pairs.size();
                 }
                 pairs.add(t);
diff --git 
a/tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j.properties
similarity index 100%
rename from 
tika-pipes/tika-fetch-iterators/tika-fetch-iterator-jdbc/src/test/resources/log4j.properties
rename to 
tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/test/resources/log4j.properties
diff --git 
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
 
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
index 5329fae..9803b80 100644
--- 
a/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
+++ 
b/tika-server/tika-server-client/src/main/java/org/apache/tika/server/client/TikaClientCLI.java
@@ -40,8 +40,7 @@ import org.xml.sax.SAXException;
 import org.apache.tika.config.TikaConfig;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.pipes.FetchEmitTuple;
-import org.apache.tika.pipes.fetchiterator.FetchIterator;
-import org.apache.tika.pipes.fetchiterator.FetchIteratorManager;
+import org.apache.tika.pipes.pipesiterator.PipesIterator;
 
 public class TikaClientCLI {
 
@@ -66,12 +65,12 @@ public class TikaClientCLI {
         ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads + 1);
         ExecutorCompletionService<Integer> completionService =
                 new ExecutorCompletionService<>(executorService);
-        final FetchIterator fetchIterator =
-                FetchIteratorManager.build(tikaConfigPath).getFetchIterator();
+        final PipesIterator pipesIterator =
+                PipesIterator.build(tikaConfigPath);
         final ArrayBlockingQueue<FetchEmitTuple> queue =
                 new ArrayBlockingQueue<>(QUEUE_SIZE);
 
-        completionService.submit(new FetchIteratorWrapper(fetchIterator, 
queue, numThreads));
+        completionService.submit(new PipesIteratorWrapper(pipesIterator, 
queue, numThreads));
         if (tikaServerUrls.size() == numThreads) {
             logDiffSizes(tikaServerUrls.size(), numThreads);
             for (int i = 0; i < numThreads; i++) {
@@ -133,7 +132,7 @@ public class TikaClientCLI {
                     send(localCache);
                     throw new TimeoutException("exceeded maxWaitMs");
                 }
-                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
                     send(localCache);
                     return 1;
                 }
@@ -169,7 +168,7 @@ public class TikaClientCLI {
                 if (t == null) {
                     throw new TimeoutException("exceeded maxWaitMs");
                 }
-                if (t == FetchIterator.COMPLETED_SEMAPHORE) {
+                if (t == PipesIterator.COMPLETED_SEMAPHORE) {
                     return 1;
                 }
                 try {
@@ -182,15 +181,15 @@ public class TikaClientCLI {
         }
     }
 
-    private class FetchIteratorWrapper implements Callable<Integer> {
-        private final FetchIterator fetchIterator;
+    private class PipesIteratorWrapper implements Callable<Integer> {
+        private final PipesIterator pipesIterator;
         private final ArrayBlockingQueue<FetchEmitTuple> queue;
         private final int numThreads;
 
-        public FetchIteratorWrapper(FetchIterator fetchIterator,
+        public PipesIteratorWrapper(PipesIterator pipesIterator,
                                     ArrayBlockingQueue<FetchEmitTuple> queue,
                                     int numThreads) {
-            this.fetchIterator = fetchIterator;
+            this.pipesIterator = pipesIterator;
             this.queue = queue;
             this.numThreads = numThreads;
 
@@ -198,12 +197,12 @@ public class TikaClientCLI {
 
         @Override
         public Integer call() throws Exception {
-            for (FetchEmitTuple t : fetchIterator) {
+            for (FetchEmitTuple t : pipesIterator) {
                 //potentially blocks forever
                 queue.put(t);
             }
             for (int i = 0; i < numThreads; i ++) {
-                queue.put(FetchIterator.COMPLETED_SEMAPHORE);
+                queue.put(PipesIterator.COMPLETED_SEMAPHORE);
             }
             return 1;
         }
diff --git 
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
 
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
index 0990813..0b029d8 100644
--- 
a/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
+++ 
b/tika-server/tika-server-client/src/test/resources/tika-config-simple-fs-emitter.xml
@@ -19,14 +19,14 @@
 -->
 <properties>
     <service-loader initializableProblemHandler="throw"/>
-    <fetchIterators>
-        <fetchIterator 
class="org.apache.tika.pipes.fetchiterator.FileSystemFetchIterator">
+    <pipesIterators>
+        <pipesIterator 
class="org.apache.tika.pipes.pipesiterator.FileSystemPipesIterator">
             <params>
                 <fetcherName>fs</fetcherName>
                 <basePath>fix</basePath>
             </params>
-        </fetchIterator>
-    </fetchIterators>
+        </pipesIterator>
+    </pipesIterators>
     <fetchers>
         <fetcher class="org.apache.tika.pipes.fetcher.FileSystemFetcher">
             <params>

Reply via email to