This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-4519
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 77682b95b8f6bad786db746a0f489275ddd80ed8
Author: tallison <[email protected]>
AuthorDate: Mon Nov 3 13:24:01 2025 -0500

    TIKA-4519 -- checkpoint commit
---
 .../java/org/apache/tika/plugins/PluginConfig.java |  10 +-
 .../org/apache/tika/plugins/PluginConfigs.java     |  41 +++--
 .../tika/eval/app/ExtractComparerRunner.java       |   2 +-
 .../apache/tika/eval/app/ExtractProfileRunner.java |   2 +-
 .../tika/pipes/emitter/fs/FileSystemEmitter.java   | 156 ++++++++-----------
 .../pipes/emitter/fs/FileSystemEmitterConfig.java  |  20 +++
 .../pipes/emitter/fs/FileSystemEmitterPlugin.java} |  40 +++--
 .../src/main/resources/emitter-plugin.properties}  |   8 +-
 .../tika/pipes/fetcher/fs/FileSystemFetcher.java   |  24 +--
 ...plugin.properties => fetcher-plugin.properties} |   0
 .../pipes/fetcher/fs/FileSystemFetcherTest.java    |  24 +--
 .../AbstractEmitter.java}                          |  19 ++-
 .../AbstractStreamEmitter.java}                    |  17 +-
 .../apache/tika/pipes/api/emitter/EmitData.java    |  19 +++
 .../org/apache/tika/pipes/api/emitter/Emitter.java |   9 +-
 .../tika/pipes/api/fetcher/AbstractFetcher.java    |   4 +-
 .../org/apache/tika/pipes/api/fetcher/Fetcher.java |   3 +-
 .../tika/pipes/api/fetcher/FetcherConfig.java      |  27 ----
 tika-pipes/tika-pipes-core-tests/pom.xml           |   6 +
 .../apache/tika/pipes/core/PassbackFilterTest.java |  16 +-
 .../apache/tika/pipes/core/PluginsTestHelper.java  |  14 +-
 .../tika/pipes/core/TikaPipesConfigTest.java       |   9 +-
 .../pipes/core/async/AsyncChaosMonkeyTest.java     |  18 +--
 .../apache/tika/pipes/core/async/MockEmitter.java  |  58 -------
 .../apache/tika/pipes/core/async/MockFetcher.java  |   4 +-
 .../tika/pipes/core/emitter/MockEmitter.java       |  54 +++++--
 .../tika/pipes/core/fetcher/MockFetcher.java       |   4 +-
 .../test/resources/configs/fetchers-emitters.json  |  16 ++
 .../src/test/resources/configs/fetchers.json       |  10 --
 .../apache/tika/pipes/core/tika-emit-config.xml    |  12 --
 .../org/apache/tika/pipes/core/PipesClient.java    |   6 +-
 .../apache/tika/pipes/core/PipesPluginsConfig.java |  97 ++++++++----
 .../org/apache/tika/pipes/core/PipesResult.java    |  14 +-
 .../org/apache/tika/pipes/core/PipesServer.java    |  19 ++-
 .../apache/tika/pipes/core/async/AsyncEmitter.java |  30 ++--
 .../tika/pipes/core/async/AsyncProcessor.java      |  14 +-
 .../apache/tika/pipes/core/async/EmitDataPair.java |   6 +
 .../emitter/{EmitData.java => EmitDataImpl.java}   |  15 +-
 .../tika/pipes/core/emitter/EmitterManager.java    |  13 +-
 .../tika/pipes/core/emitter/EmptyEmitter.java      |  23 ++-
 .../EmittingEmbeddedDocumentBytesHandler.java      |   2 +
 .../tika/pipes/core/fetcher/EmptyFetcher.java      |   4 +-
 .../tika/pipes/core/fetcher/FetcherManager.java    |   4 +-
 .../pipes/core/pipesiterator/PipesIterator.java    |  14 +-
 .../pipes/core/serialization/JsonEmitData.java     |   4 +-
 .../tika/pipes/emitter/fs/FileSystemEmitter.java   | 171 ---------------------
 .../filelist/FileListPipesIterator.java            |   8 +-
 .../pipesiterator/fs/FileSystemPipesIterator.java  |   6 +-
 .../tika/pipes/core/TikaPipesConfigTest.java       |   8 +-
 .../tika/pipes/core/emitter/MockEmitter.java       |  60 --------
 .../filelist/FileListPipesIteratorTest.java        |   8 +-
 .../fs/FileSystemPipesIteratorTest.java            |   4 +-
 .../src/test/resources/configs/fetchers.json       |   2 +-
 .../apache/tika/config/pipes-iterator-config.xml   |   3 +-
 .../tika/config/pipes-iterator-multiple-config.xml |   6 +-
 .../pipesiterator/azblob/AZBlobPipesIterator.java  |   2 +-
 .../pipes/pipesiterator/csv/CSVPipesIterator.java  |   2 +-
 .../src/test/java/TestCSVPipesIterator.java        |   2 +-
 .../pipes/pipesiterator/gcs/GCSPipesIterator.java  |   2 +-
 .../pipesiterator/jdbc/JDBCPipesIterator.java      |   6 +-
 .../pipesiterator/kafka/KafkaPipesIterator.java    |   2 +-
 .../pipes/pipesiterator/s3/S3PipesIterator.java    |   2 +-
 .../pipesiterator/solr/SolrPipesIterator.java      |   2 +-
 .../tika/serialization/PluginConfigLoader.java     |  34 ++++
 .../serialization/PluginsConfigDeserializer.java   |  27 ++++
 .../serialization/PluginsConfigSerializer.java     |  21 +++
 .../tika/serialization/PluginsConfigTest.java      |  69 +++++++++
 .../tika/server/core/resource/AsyncResource.java   |   4 +-
 68 files changed, 652 insertions(+), 710 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterConfig.java
 b/tika-core/src/main/java/org/apache/tika/plugins/PluginConfig.java
similarity index 74%
rename from 
tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterConfig.java
rename to tika-core/src/main/java/org/apache/tika/plugins/PluginConfig.java
index 8c944ee58..532ee9d38 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitterConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/plugins/PluginConfig.java
@@ -14,14 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.api.emitter;
+package org.apache.tika.plugins;
 
-import java.io.Serializable;
+public record PluginConfig(String pluginId, String jsonConfig) {
 
-public interface EmitterConfig extends Serializable {
-
-    String getPluginId();
-    EmitterConfig setPluginId(String pluginId);
-    String getConfigJson();
-    EmitterConfig setConfigJson(String config);
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/DefaultFetcherConfig.java
 b/tika-core/src/main/java/org/apache/tika/plugins/PluginConfigs.java
similarity index 51%
rename from 
tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/DefaultFetcherConfig.java
rename to tika-core/src/main/java/org/apache/tika/plugins/PluginConfigs.java
index 986d015ee..fbed67343 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/config/DefaultFetcherConfig.java
+++ b/tika-core/src/main/java/org/apache/tika/plugins/PluginConfigs.java
@@ -14,38 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.core.fetcher.config;
+package org.apache.tika.plugins;
 
-import org.apache.tika.pipes.api.fetcher.FetcherConfig;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 
-public class DefaultFetcherConfig implements FetcherConfig {
+public class PluginConfigs {
 
-    private String plugId;
-    private String configJson;
+    Map<String, PluginConfig> pluginConfigs = new HashMap<>();
+
+    public PluginConfigs() {
 
-    public DefaultFetcherConfig(String plugId, String configJson) {
-        this.plugId = plugId;
-        this.configJson = configJson;
-    }
-    @Override
-    public String getPluginId() {
-        return plugId;
     }
 
-    @Override
-    public FetcherConfig setPluginId(String pluginId) {
-        this.plugId = pluginId;
-        return this;
+    public PluginConfigs(Map<String, PluginConfig> map) {
+        pluginConfigs.putAll(map);
     }
 
-    @Override
-    public String getConfigJson() {
-        return configJson;
+    public void add(PluginConfig pluginConfig) {
+        if (pluginConfigs.containsKey(pluginConfig.pluginId())) {
+            throw new IllegalArgumentException("Can't overwrite existing 
plugin for id: " + pluginConfig.pluginId());
+        }
+        pluginConfigs.put(pluginConfig.pluginId(), pluginConfig);
     }
 
-    @Override
-    public FetcherConfig setConfigJson(String configJson) {
-        this.configJson = configJson;
-        return this;
+    public Optional<PluginConfig> get(String pluginId) {
+        return Optional.ofNullable(pluginConfigs.get(pluginId));
     }
+
 }
diff --git 
a/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractComparerRunner.java
 
b/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractComparerRunner.java
index 0ab120c81..629fba23e 100644
--- 
a/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractComparerRunner.java
+++ 
b/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractComparerRunner.java
@@ -180,7 +180,7 @@ public class ExtractComparerRunner {
     private static PipesIterator createIterator(Path inputDir) {
         FileSystemPipesIterator fs = new FileSystemPipesIterator(inputDir);
         fs.setFetcherName("");
-        fs.setEmitterName("");
+        fs.setEmitterPluginId("");
         return fs;
     }
 
diff --git 
a/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractProfileRunner.java
 
b/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractProfileRunner.java
index b618bf0af..2790bb9f0 100644
--- 
a/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractProfileRunner.java
+++ 
b/tika-eval/tika-eval-app/src/main/java/org/apache/tika/eval/app/ExtractProfileRunner.java
@@ -174,7 +174,7 @@ public class ExtractProfileRunner {
     private static PipesIterator createIterator(Path inputDir) {
         FileSystemPipesIterator fs = new FileSystemPipesIterator(inputDir);
         fs.setFetcherName("");
-        fs.setEmitterName("");
+        fs.setEmitterPluginId("");
         return fs;
     }
 
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index efc4a4fdc..cb03a1e26 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -26,62 +26,62 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.List;
+import java.util.Optional;
 
-import org.apache.tika.config.Field;
+import org.pf4j.Extension;
+
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.ParseContext;
-import org.apache.tika.pipes.api.emitter.StreamEmitter;
+import org.apache.tika.pipes.api.emitter.AbstractStreamEmitter;
+import org.apache.tika.plugins.PluginConfig;
+import org.apache.tika.plugins.PluginConfigs;
 import org.apache.tika.serialization.JsonMetadataList;
+import org.apache.tika.utils.StringUtils;
 
 /**
  * Emitter to write to a file system.
  * <p>
- * This calculates the path to write to based on the {@link #basePath}
+ * This calculates the path to write to based on the {@link 
FileSystemEmitterConfig#basePath()}
  * and the value of the {@link TikaCoreProperties#SOURCE_PATH} value.
  *
  * <pre class="prettyprint">
- *  &lt;properties&gt;
- *      &lt;emitters&gt;
- *          &lt;emitter 
class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter&gt;
- *              &lt;params&gt;
- *                  &lt;!-- required --&gt;
- *                  &lt;param name="name" type="string"&gt;fs&lt;/param&gt;
- *                  &lt;!-- required --&gt;
- *                  &lt;param name="basePath" 
type="string"&gt;/path/to/output&lt;/param&gt;
- *                  &lt;!-- optional; default is 'json' --&gt;
- *                  &lt;param name="fileExtension" 
type="string"&gt;json&lt;/param&gt;
- *                  &lt;!-- optional; if the file already exists,
- *                       options ('skip', 'replace', 'exception')
- *                  default is 'exception' --&gt;
- *                  &lt;param name="onExists" 
type="string"&gt;skip&lt;/param&gt;
- *                  &lt;!-- optional; whether or not to pretty print the output
- *                      default is false --&gt;
- *                     &lt;param name="prettyPrint" 
type="boolean"&gt;true&lt;/param&gt;
- *              &lt;/params&gt;
- *          &lt;/emitter&gt;
- *      &lt;/emitters&gt;
- *  &lt;/properties&gt;</pre>
+ * </pre>
  */
-public class FileSystemEmitter implements StreamEmitter {
+@Extension
+public class FileSystemEmitter extends AbstractStreamEmitter {
+
+    private FileSystemEmitterConfig fileSystemEmitterConfig;
 
-    private Path basePath = null;
-    private String fileExtension = "json";
-    private ON_EXISTS onExists = ON_EXISTS.EXCEPTION;
+    public FileSystemEmitter() throws IOException {
+        super();
+    }
 
-    private boolean prettyPrint = false;
+    @Override
+    public void configure(PluginConfig pluginConfig) throws 
TikaConfigException, IOException {
+        checkPluginId(pluginConfig.pluginId());
+        fileSystemEmitterConfig = 
FileSystemEmitterConfig.load(pluginConfig.jsonConfig());
+        //checkConfig(fileSystemEmitterConfig);
+    }
 
     @Override
     public void emit(String emitKey, List<Metadata> metadataList, ParseContext 
parseContext) throws IOException {
-        Path output;
+
         if (metadataList == null || metadataList.isEmpty()) {
             throw new IOException("metadata list must not be null or of size 
0");
         }
 
-        if (fileExtension != null && ! fileExtension.isEmpty()) {
-            emitKey += "." + fileExtension;
+        FileSystemEmitterConfig config = getConfig(parseContext);
+
+        Path output;
+
+        if (!StringUtils.isBlank(config.fileExtension())) {
+            emitKey += "." + config.fileExtension();
         }
-        if (basePath != null) {
+
+        if (config.basePath() != null) {
+            Path basePath = Paths.get(config.basePath());
             output = basePath.resolve(emitKey);
             if 
(!output.toAbsolutePath().normalize().startsWith(basePath.toAbsolutePath().normalize()))
 {
                 throw new IOException("path traversal?! " + 
output.toAbsolutePath());
@@ -94,70 +94,36 @@ public class FileSystemEmitter implements StreamEmitter {
             Files.createDirectories(output.getParent());
         }
         try (Writer writer = Files.newBufferedWriter(output, 
StandardCharsets.UTF_8)) {
-            JsonMetadataList.toJson(metadataList, writer, prettyPrint);
+            JsonMetadataList.toJson(metadataList, writer, 
config.prettyPrint());
         }
     }
 
-    @Field
-    public void setBasePath(String basePath) {
-        this.basePath = Paths.get(basePath);
-    }
-
-    /**
-     * If you want to customize the output file's file extension.
-     * Do not include the "."
-     *
-     * @param fileExtension
-     */
-    @Field
-    public void setFileExtension(String fileExtension) {
-        this.fileExtension = fileExtension;
-    }
+    @Override
+    public void emit(String emitKey, InputStream inputStream, Metadata 
userMetadata, ParseContext parseContext) throws IOException {
+        FileSystemEmitterConfig config = getConfig(parseContext);
 
-    /**
-     * What to do if the target file already exists.  NOTE: if more than one
-     * thread is trying write to the same file and {@link ON_EXISTS#REPLACE} 
is chosen,
-     * you still might get a {@link FileAlreadyExistsException}.
-     *
-     * @param onExists
-     */
-    @Field
-    public void setOnExists(String onExists) {
-        switch (onExists) {
-            case "skip":
-                this.onExists = ON_EXISTS.SKIP;
-                break;
-            case "replace":
-                this.onExists = ON_EXISTS.REPLACE;
-                break;
-            case "exception":
-                this.onExists = ON_EXISTS.EXCEPTION;
-                break;
-            default:
-                throw new IllegalArgumentException("Don't understand '" + 
onExists + "'; must be one of: 'skip', 'replace', 'exception'");
+        Path output;
+        if (config.basePath() != null) {
+            Path basePath = Paths.get(config.basePath());
+            output = basePath.resolve(emitKey);
+            if 
(!output.toAbsolutePath().normalize().startsWith(basePath.toAbsolutePath().normalize()))
 {
+                throw new IOException("path traversal?! " + 
output.toAbsolutePath());
+            }
+        } else {
+            output = Paths.get(emitKey);
         }
-    }
 
-    @Field
-    public void setPrettyPrint(boolean prettyPrint) {
-        this.prettyPrint = prettyPrint;
-    }
-
-    @Override
-    public void emit(String path, InputStream inputStream, Metadata 
userMetadata, ParseContext parseContext) throws IOException {
-        Path target = basePath.resolve(path);
-
-        if (!Files.isDirectory(target.getParent())) {
-            Files.createDirectories(target.getParent());
+        if (!Files.isDirectory(output.getParent())) {
+            Files.createDirectories(output.getParent());
         }
-        if (onExists == ON_EXISTS.REPLACE) {
-            Files.copy(inputStream, target, 
StandardCopyOption.REPLACE_EXISTING);
-        } else if (onExists == ON_EXISTS.EXCEPTION) {
-            Files.copy(inputStream, target);
-        } else if (onExists == ON_EXISTS.SKIP) {
-            if (!Files.isRegularFile(target)) {
+        if (config.onExists() == ON_EXISTS.REPLACE) {
+            Files.copy(inputStream, output, 
StandardCopyOption.REPLACE_EXISTING);
+        } else if (config.onExists() == ON_EXISTS.EXCEPTION) {
+            Files.copy(inputStream, output);
+        } else if (config.onExists() == ON_EXISTS.SKIP) {
+            if (!Files.isRegularFile(output)) {
                 try {
-                    Files.copy(inputStream, target);
+                    Files.copy(inputStream, output);
                 } catch (FileAlreadyExistsException e) {
                     //swallow
                 }
@@ -165,7 +131,15 @@ public class FileSystemEmitter implements StreamEmitter {
         }
     }
 
-    enum ON_EXISTS {
-        SKIP, EXCEPTION, REPLACE
+    private FileSystemEmitterConfig getConfig(ParseContext parseContext) 
throws IOException {
+        FileSystemEmitterConfig config = fileSystemEmitterConfig;
+        PluginConfigs pluginConfigs = parseContext.get(PluginConfigs.class);
+        if (pluginConfigs != null) {
+            Optional<PluginConfig> pluginConfigOpt = 
pluginConfigs.get(getPluginId());
+            if (pluginConfigOpt.isPresent()) {
+                config = 
FileSystemEmitterConfig.load(pluginConfigOpt.get().jsonConfig());
+            }
+        }
+        return config;
     }
 }
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterConfig.java
 
b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterConfig.java
new file mode 100644
index 000000000..9106d8b0c
--- /dev/null
+++ 
b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterConfig.java
@@ -0,0 +1,20 @@
+package org.apache.tika.pipes.emitter.fs;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+enum ON_EXISTS {
+    SKIP, EXCEPTION, REPLACE
+}
+
+public record FileSystemEmitterConfig(String basePath, String fileExtension, 
ON_EXISTS onExists, boolean prettyPrint) {
+
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    public static FileSystemEmitterConfig load(String json) throws IOException 
{
+        return OBJECT_MAPPER.readValue(json, FileSystemEmitterConfig.class);
+    }
+
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/config/DefaultEmitterConfig.java
 
b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterPlugin.java
similarity index 54%
rename from 
tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/config/DefaultEmitterConfig.java
rename to 
tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterPlugin.java
index 4d41cd7d6..e8bec02a5 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/config/DefaultEmitterConfig.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitterPlugin.java
@@ -14,38 +14,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.core.emitter.config;
+package org.apache.tika.pipes.emitter.fs;
 
-import org.apache.tika.pipes.api.emitter.EmitterConfig;
+import org.pf4j.Plugin;
+import org.pf4j.PluginWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class DefaultEmitterConfig implements EmitterConfig {
+public class FileSystemEmitterPlugin extends Plugin {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemEmitterPlugin.class);
 
-    private String plugId;
-    private String configJson;
-
-    public DefaultEmitterConfig(String plugId, String configJson) {
-        this.plugId = plugId;
-        this.configJson = configJson;
-    }
-    @Override
-    public String getPluginId() {
-        return plugId;
+    public FileSystemEmitterPlugin(PluginWrapper wrapper) {
+        super(wrapper);
     }
 
     @Override
-    public EmitterConfig setPluginId(String pluginId) {
-        this.plugId = pluginId;
-        return this;
+    public void start() {
+        LOG.info("Starting");
+        super.start();
     }
 
     @Override
-    public String getConfigJson() {
-        return configJson;
+    public void stop() {
+        LOG.info("Stopping");
+        super.stop();
     }
 
     @Override
-    public EmitterConfig setConfigJson(String configJson) {
-        this.configJson = configJson;
-        return this;
+    public void delete() {
+        LOG.info("Deleting");
+        super.delete();
     }
+
 }
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/plugin.properties
 
b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/resources/emitter-plugin.properties
similarity index 80%
copy from 
tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/plugin.properties
copy to 
tika-pipes/tika-emitters/tika-emitter-file-system/src/main/resources/emitter-plugin.properties
index b2488f75d..a85876524 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/plugin.properties
+++ 
b/tika-pipes/tika-emitters/tika-emitter-file-system/src/main/resources/emitter-plugin.properties
@@ -14,8 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-plugin.id=file-system-fetcher
-plugin.class=org.apache.tika.pipes.fetcher.fs.FileSystemFetcherPlugin
+plugin.id=file-system-emitter
+plugin.class=org.apache.tika.pipes.emitter.fs.FileSystemEmitterPlugin
 plugin.version=4.0.0-SNAPSHOT
-plugin.provider=Local File System Fetcher
-plugin.description=Capable of fetching the local file system
+plugin.provider=Local File System Emitter
+plugin.description=Capable of emitting the local file system
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
 
b/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
index 3e3df6b73..3f6d0ffde 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
@@ -25,6 +25,7 @@ import java.nio.file.Paths;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.FileTime;
 import java.util.Date;
+import java.util.Optional;
 
 import org.pf4j.Extension;
 import org.slf4j.Logger;
@@ -39,8 +40,9 @@ import org.apache.tika.metadata.Property;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.fetcher.AbstractFetcher;
-import org.apache.tika.pipes.api.fetcher.FetcherConfig;
 import org.apache.tika.pipes.fetcher.fs.config.FileSystemFetcherConfig;
+import org.apache.tika.plugins.PluginConfig;
+import org.apache.tika.plugins.PluginConfigs;
 import org.apache.tika.utils.StringUtils;
 
 /**
@@ -72,9 +74,9 @@ public class FileSystemFetcher extends AbstractFetcher {
     }
 
     @Override
-    public void configure(FetcherConfig fetcherConfig) throws IOException, 
TikaConfigException {
-        checkPluginId(fetcherConfig.getPluginId());
-        defaultFileSystemFetcherConfig = 
FileSystemFetcherConfig.load(fetcherConfig.getConfigJson());
+    public void configure(PluginConfig pluginConfig) throws IOException, 
TikaConfigException {
+        checkPluginId(pluginConfig.pluginId());
+        defaultFileSystemFetcherConfig = 
FileSystemFetcherConfig.load(pluginConfig.jsonConfig());
         checkConfig(defaultFileSystemFetcherConfig);
     }
 
@@ -86,11 +88,15 @@ public class FileSystemFetcher extends AbstractFetcher {
                     "a file name with this character in it.");
         }
         FileSystemFetcherConfig config = defaultFileSystemFetcherConfig;
-        FetcherConfig fetcherConfig = parseContext.get(FetcherConfig.class);
-        if (fetcherConfig != null) {
-            checkPluginId(fetcherConfig.getPluginId());
-            config = 
FileSystemFetcherConfig.load(fetcherConfig.getConfigJson());
-            checkConfig(config);
+        PluginConfigs pluginConfigManager = 
parseContext.get(PluginConfigs.class);
+        if (pluginConfigManager != null) {
+            Optional<PluginConfig> pluginConfigOpt = 
pluginConfigManager.get(getPluginId());
+            if (pluginConfigOpt.isPresent()) {
+                PluginConfig pluginConfig = pluginConfigOpt.get();
+                checkPluginId(pluginConfig.pluginId());
+                config = 
FileSystemFetcherConfig.load(pluginConfig.jsonConfig());
+                checkConfig(config);
+            }
         }
         Path p = null;
         if (! StringUtils.isBlank(config.getBasePath())) {
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/plugin.properties
 
b/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/fetcher-plugin.properties
similarity index 100%
rename from 
tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/plugin.properties
rename to 
tika-pipes/tika-fetchers/tika-fetcher-file-system/src/main/resources/fetcher-plugin.properties
diff --git 
a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
 
b/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
index 0abd34f48..e87372a23 100644
--- 
a/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
+++ 
b/tika-pipes/tika-fetchers/tika-fetcher-file-system/src/test/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcherTest.java
@@ -26,7 +26,7 @@ import java.nio.file.Paths;
 import org.junit.jupiter.api.Test;
 
 import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.pipes.api.fetcher.FetcherConfig;
+import org.apache.tika.plugins.PluginConfig;
 
 
 public class FileSystemFetcherTest {
@@ -49,27 +49,7 @@ public class FileSystemFetcherTest {
     public void testNullByte() throws Exception {
         FileSystemFetcher f = new FileSystemFetcher();
         assertThrows(TikaConfigException.class, () -> {
-            f.configure(new FetcherConfig() {
-                @Override
-                public String getPluginId() {
-                    return "blah";
-                }
-
-                @Override
-                public FetcherConfig setPluginId(String pluginId) {
-                    return this;
-                }
-
-                @Override
-                public String getConfigJson() {
-                    return "{ \"basePath\":\"bad\u0000path\"}";
-                }
-
-                @Override
-                public FetcherConfig setConfigJson(String config) {
-                    return this;
-                }
-            });
+            f.configure(new PluginConfig("test", "{ 
\"basePath\":\"bad\u0000path\"}"));
         });
     }
 }
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
similarity index 75%
copy from 
tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java
copy to 
tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
index f0554217d..4d5f73849 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractEmitter.java
@@ -14,24 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.api.fetcher;
+package org.apache.tika.pipes.api.emitter;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.fetcher.Fetcher;
 
-public abstract class AbstractFetcher implements Fetcher {
+public abstract class AbstractEmitter implements Emitter {
 
     private final String pluginId;
-    public AbstractFetcher() throws IOException {
+    public AbstractEmitter() throws IOException {
         Properties properties = new Properties();
-        try (InputStream is = 
this.getClass().getResourceAsStream("/plugin.properties")) {
+        try (InputStream is = 
this.getClass().getResourceAsStream("/emitter-plugin.properties")) {
             properties.load(is);
         }
         pluginId = (String) properties.get("plugin.id");
-
     }
 
     @Override
@@ -39,6 +40,14 @@ public abstract class AbstractFetcher implements Fetcher {
         return pluginId;
     }
 
+    @Override
+    public void emit(List<? extends EmitData> emitData) throws IOException {
+        for (EmitData item : emitData) {
+            emit(item.getEmitKey(), item.getMetadataList(), 
item.getParseContext());
+        }
+    }
+
+
     protected void checkPluginId(String pluginId) throws TikaConfigException {
         if (! getPluginId().equals(pluginId)) {
             throw new TikaConfigException("Plugin id mismatch: " + 
getPluginId() + " <> " + pluginId);
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
similarity index 76%
copy from 
tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java
copy to 
tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
index f0554217d..0f12535d3 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/AbstractStreamEmitter.java
@@ -14,24 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.tika.pipes.api.fetcher;
+package org.apache.tika.pipes.api.emitter;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.tika.exception.TikaConfigException;
 
-public abstract class AbstractFetcher implements Fetcher {
+public abstract class AbstractStreamEmitter implements StreamEmitter {
 
     private final String pluginId;
-    public AbstractFetcher() throws IOException {
+    public AbstractStreamEmitter() throws IOException {
         Properties properties = new Properties();
-        try (InputStream is = 
this.getClass().getResourceAsStream("/plugin.properties")) {
+        try (InputStream is = 
this.getClass().getResourceAsStream("/emitter-plugin.properties")) {
             properties.load(is);
         }
         pluginId = (String) properties.get("plugin.id");
-
     }
 
     @Override
@@ -44,4 +44,11 @@ public abstract class AbstractFetcher implements Fetcher {
             throw new TikaConfigException("Plugin id mismatch: " + 
getPluginId() + " <> " + pluginId);
         }
     }
+
+    @Override
+    public void emit(List<? extends EmitData> emitData) throws IOException {
+        for (EmitData item : emitData) {
+            emit(item.getEmitKey(), item.getMetadataList(), 
item.getParseContext());
+        }
+    }
 }
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
new file mode 100644
index 000000000..3c1b09dbb
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/EmitData.java
@@ -0,0 +1,19 @@
+package org.apache.tika.pipes.api.emitter;
+
+import java.util.List;
+
+import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+
+public interface EmitData {
+    String getEmitKey();
+
+    List<Metadata> getMetadataList();
+
+    String getContainerStackTrace();
+
+    long getEstimatedSizeBytes();
+
+    ParseContext getParseContext();
+
+}
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java
index aa7122a50..dad43467a 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/emitter/Emitter.java
@@ -19,18 +19,23 @@ package org.apache.tika.pipes.api.emitter;
 import java.io.IOException;
 import java.util.List;
 
+import org.pf4j.ExtensionPoint;
+
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
+import org.apache.tika.plugins.PluginConfig;
 
-public interface Emitter {
+public interface Emitter extends ExtensionPoint {
 
-    void configure(EmitterConfig emitterConfig) throws TikaConfigException, 
IOException;
+    void configure(PluginConfig pluginConfig) throws TikaConfigException, 
IOException;
 
     String getPluginId();
 
     void emit(String emitKey, List<Metadata> metadataList, ParseContext 
parseContext) throws IOException;
 
+    void emit(List<? extends EmitData> emitData) throws IOException;
+
     //TODO -- add this later for xhtml?
     //void emit(String txt, Metadata metadata) throws IOException, 
TikaException;
 
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java
index f0554217d..6d796b584 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/AbstractFetcher.java
@@ -25,13 +25,13 @@ import org.apache.tika.exception.TikaConfigException;
 public abstract class AbstractFetcher implements Fetcher {
 
     private final String pluginId;
+
     public AbstractFetcher() throws IOException {
         Properties properties = new Properties();
-        try (InputStream is = 
this.getClass().getResourceAsStream("/plugin.properties")) {
+        try (InputStream is = 
this.getClass().getResourceAsStream("/fetcher-plugin.properties")) {
             properties.load(is);
         }
         pluginId = (String) properties.get("plugin.id");
-
     }
 
     @Override
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java
index c4b523fd1..07469022e 100644
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java
+++ 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/Fetcher.java
@@ -25,6 +25,7 @@ import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
+import org.apache.tika.plugins.PluginConfig;
 
 /**
  * Interface for an object that will fetch an InputStream given
@@ -35,7 +36,7 @@ import org.apache.tika.parser.ParseContext;
  */
 public interface Fetcher extends ExtensionPoint {
 
-    void configure(FetcherConfig fetcherConfig) throws TikaConfigException, 
IOException;
+    void configure(PluginConfig fetcherConfig) throws TikaConfigException, 
IOException;
 
     String getPluginId();
 
diff --git 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherConfig.java
 
b/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherConfig.java
deleted file mode 100644
index 2c5e75811..000000000
--- 
a/tika-pipes/tika-pipes-api/src/main/java/org/apache/tika/pipes/api/fetcher/FetcherConfig.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.api.fetcher;
-
-import java.io.Serializable;
-
-public interface FetcherConfig extends Serializable {
-
-    String getPluginId();
-    FetcherConfig setPluginId(String pluginId);
-    String getConfigJson();
-    FetcherConfig setConfigJson(String config);
-}
diff --git a/tika-pipes/tika-pipes-core-tests/pom.xml 
b/tika-pipes/tika-pipes-core-tests/pom.xml
index c8b531526..c6a8a91de 100644
--- a/tika-pipes/tika-pipes-core-tests/pom.xml
+++ b/tika-pipes/tika-pipes-core-tests/pom.xml
@@ -44,6 +44,12 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>tika-emitter-file-system</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>tika-fetcher-file-system</artifactId>
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
index 3bf377855..a4eea1ce5 100644
--- 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
+++ 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PassbackFilterTest.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
@@ -47,23 +48,19 @@ public class PassbackFilterTest {
 
     private Path tmpDir;
     String fetcherPluginId = "file-system-fetcher";
+    String emitterPluginId = "file-system-emitter";
     String testPdfFile = "testOverlappingText.pdf";
 
     private PipesClient pipesClient;
 
     @BeforeEach
     public void init() throws Exception {
-        Path tikaConfigTemplate = Paths.get("src", "test", "resources", "org", 
"apache", "tika", "pipes", "core", "tika-emit-config.xml");
+        Path tikaConfig = Paths.get("src", "test", "resources", "org", 
"apache", "tika", "pipes", "core", "tika-emit-config.xml");
         tmpDir = Files.createTempDirectory("tika-pipes");
         Path pipesConfigPath = 
PluginsTestHelper.getFileSystemFetcherConfig(tmpDir);
 
         Path tikaConfigPath = Files.createTempFile(tmpDir, "tika-pipes-", 
".xml");
-        String template = Files.readString(tikaConfigTemplate, 
StandardCharsets.UTF_8);
-        template = template.replace("EMITTER_BASE_PATH", tmpDir
-                .toAbsolutePath()
-                .toString());
-        Files.writeString(tikaConfigPath, template);
-
+        Files.copy(tikaConfig, tikaConfigPath, 
StandardCopyOption.REPLACE_EXISTING);
         PipesConfig pipesConfig = PipesConfig.load(tikaConfigPath, 
pipesConfigPath);
         PluginsTestHelper.copyTestFilesToTmpInput(tmpDir, testPdfFile);
 
@@ -81,7 +78,8 @@ public class PassbackFilterTest {
         ParseContext parseContext = new ParseContext();
         parseContext.set(PassbackFilter.class, new MyPassbackFilter());
         PipesResult pipesResult = pipesClient.process(
-                new FetchEmitTuple(testPdfFile, new FetchKey(fetcherPluginId, 
testPdfFile), new EmitKey("fs", emitFileBase), new Metadata(), parseContext,
+                new FetchEmitTuple(testPdfFile, new FetchKey(fetcherPluginId, 
testPdfFile),
+                        new EmitKey(emitterPluginId, emitFileBase), new 
Metadata(), parseContext,
                         FetchEmitTuple.ON_PARSE_EXCEPTION.SKIP));
         assertEquals(PipesResult.STATUS.EMIT_SUCCESS_PASSBACK, 
pipesResult.getStatus());
         Assertions.assertNotNull(pipesResult
@@ -100,7 +98,7 @@ public class PassbackFilterTest {
         assertNull(metadata.get(Metadata.CONTENT_LENGTH));
         assertEquals(1, metadata.names().length);
 
-        List<Metadata> metadataList = 
JsonMetadataList.fromJson(Files.newBufferedReader(tmpDir.resolve(emitFileBase + 
".json"), StandardCharsets.UTF_8));
+        List<Metadata> metadataList = 
JsonMetadataList.fromJson(Files.newBufferedReader(tmpDir.resolve("output").resolve(emitFileBase
 + ".json"), StandardCharsets.UTF_8));
         assertEquals(1, metadataList.size());
         assertEquals("application/pdf", metadataList
                 .get(0)
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
index 625710810..d89b479e1 100644
--- 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
+++ 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/PluginsTestHelper.java
@@ -25,19 +25,25 @@ import java.nio.file.Paths;
 public class PluginsTestHelper {
 
     public static Path getFileSystemFetcherConfig(Path configBase) throws 
Exception {
-        return getFileSystemFetcherConfig(configBase, 
configBase.resolve("input"));
+        return getFileSystemFetcherConfig(configBase, 
configBase.resolve("input"), configBase.resolve("output"));
     }
 
-    public static Path getFileSystemFetcherConfig(Path configBase, Path 
filesBase) throws Exception {
+    public static Path getFileSystemFetcherConfig(Path configBase, Path 
fetcherBase, Path emitterBase) throws Exception {
         Path pipesConfig = configBase.resolve("pipes-config.json");
 
-        Path tikaPluginsTemplate = Paths.get("src", "test", "resources", 
"configs", "fetchers.json");
+        Path tikaPluginsTemplate = Paths.get("src", "test", "resources", 
"configs", "fetchers-emitters.json");
         String json = Files.readString(tikaPluginsTemplate, 
StandardCharsets.UTF_8);
 
-        json = json.replace("BASE_PATH", filesBase
+        json = json.replace("FETCHERS_BASE_PATH", fetcherBase
                 .toAbsolutePath()
                 .toString());
 
+        if (emitterBase != null) {
+            json = json.replace("EMITTERS_BASE_PATH", emitterBase
+                    .toAbsolutePath()
+                    .toString());
+        }
+
         Files.write(pipesConfig, json.getBytes(StandardCharsets.UTF_8));
         return pipesConfig;
     }
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java
 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java
index 02bb42c06..6f049652e 100644
--- 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java
+++ 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.tika.config.AbstractTikaConfigTest;
 import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.emitter.Emitter;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
 import org.apache.tika.pipes.core.pipesiterator.PipesIterator;
 
@@ -63,15 +64,15 @@ public class TikaPipesConfigTest extends 
AbstractTikaConfigTest {
 
         FetcherManager fetcherManager = FetcherManager.load(
                 getConfigFilePath("fetchers-nobasepath-config.xml"));
-    }*/
+    }
 
     @Test
     public void testEmitters() throws Exception {
         EmitterManager emitterManager =
                 EmitterManager.load(getConfigFilePath("emitters-config.xml"));
-        Emitter em1 = emitterManager.getEmitter("em1");
+        Emitter em1 = emitterManager.getEmitter("file-system-emitter-1");
         assertNotNull(em1);
-        Emitter em2 = emitterManager.getEmitter("em2");
+        Emitter em2 = emitterManager.getEmitter("file-system-emitter-2");
         assertNotNull(em2);
     }
 
@@ -81,7 +82,7 @@ public class TikaPipesConfigTest extends 
AbstractTikaConfigTest {
             
EmitterManager.load(getConfigFilePath("emitters-duplicate-config.xml"));
         });
     }
-
+*/
     @Test
     public void testPipesIterator() throws Exception {
         PipesIterator it =
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
index eb836c93d..81ee817b3 100644
--- 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
+++ 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/AsyncChaosMonkeyTest.java
@@ -30,11 +30,12 @@ import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.api.emitter.EmitData;
 import org.apache.tika.pipes.core.FetchEmitTuple;
 import org.apache.tika.pipes.core.PipesResult;
 import org.apache.tika.pipes.core.PluginsTestHelper;
-import org.apache.tika.pipes.core.emitter.EmitData;
 import org.apache.tika.pipes.core.emitter.EmitKey;
+import org.apache.tika.pipes.core.emitter.MockEmitter;
 import org.apache.tika.pipes.core.fetcher.FetchKey;
 import org.apache.tika.pipes.core.pipesiterator.PipesIterator;
 
@@ -82,10 +83,7 @@ public class AsyncChaosMonkeyTest {
         crash = 0;
         Path tikaConfigPath = Files.createTempFile(configDir, "tika-config-", 
".xml");
         String xml =
-                "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" 
+ "  <emitters>" +
-                "  <emitter 
class=\"org.apache.tika.pipes.core.async.MockEmitter\">\n" +
-                "         <name>mock</name>\n" + "  </emitter>" +
-                "  </emitters>" +
+                "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" 
+
                 " <autoDetectParserConfig>\n" +
                         "    <digesterFactory\n" +
                         "        
class=\"org.apache.tika.pipes.core.async.MockDigesterFactory\"/>\n" +
@@ -120,7 +118,7 @@ public class AsyncChaosMonkeyTest {
         }
         MockEmitter.EMIT_DATA.clear();
         MockReporter.RESULTS.clear();
-        pipesPluginsConfigPath = 
PluginsTestHelper.getFileSystemFetcherConfig(configDir, inputDir);
+        pipesPluginsConfigPath = 
PluginsTestHelper.getFileSystemFetcherConfig(configDir, inputDir, null);
         return tikaConfigPath;
     }
 
@@ -143,7 +141,7 @@ public class AsyncChaosMonkeyTest {
         for (int i = 0; i < totalFiles; i++) {
             FetchEmitTuple t = new FetchEmitTuple("myId-" + i,
                     new FetchKey(fetcherPluginId, i + ".xml"),
-                    new EmitKey("mock", "emit-" + i), new Metadata());
+                    new EmitKey("mock-emitter", "emit-" + i), new Metadata());
             processor.offer(t, 1000);
         }
         for (int i = 0; i < 10; i++) {
@@ -156,7 +154,7 @@ public class AsyncChaosMonkeyTest {
         processor.close();
         Set<String> emitKeys = new HashSet<>();
         for (EmitData d : MockEmitter.EMIT_DATA) {
-            emitKeys.add(d.getEmitKey().getEmitKey());
+            emitKeys.add(d.getEmitKey());
         }
         assertEquals(ok, emitKeys.size());
         assertEquals(100, MockReporter.RESULTS.size());
@@ -171,7 +169,7 @@ public class AsyncChaosMonkeyTest {
         AsyncProcessor processor = new AsyncProcessor(setUp(true), 
pipesPluginsConfigPath);
         for (int i = 0; i < totalFiles; i++) {
             FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new 
FetchKey(fetcherPluginId, i + ".xml"),
-                    new EmitKey("mock", "emit-" + i), new Metadata());
+                    new EmitKey("mock-emitter", "emit-" + i), new Metadata());
             processor.offer(t, 1000);
         }
         for (int i = 0; i < 10; i++) {
@@ -185,7 +183,7 @@ public class AsyncChaosMonkeyTest {
         Set<String> emitKeys = new HashSet<>();
         int observedOOM = 0;
         for (EmitData d : MockEmitter.EMIT_DATA) {
-            emitKeys.add(d.getEmitKey().getEmitKey());
+            emitKeys.add(d.getEmitKey());
             assertEquals(64,
                     
d.getMetadataList().get(0).get("X-TIKA:digest:SHA-256").trim().length());
             assertEquals("application/mock+xml",
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockEmitter.java
 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockEmitter.java
deleted file mode 100644
index fab797ed1..000000000
--- 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockEmitter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.core.async;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.pipes.core.emitter.EmitData;
-import org.apache.tika.pipes.core.emitter.EmitKey;
-import org.apache.tika.pipes.core.emitter.TikaEmitterException;
-
-public class MockEmitter extends AbstractEmitter {
-
-    static ArrayBlockingQueue<EmitData> EMIT_DATA = new 
ArrayBlockingQueue<>(10000);
-
-    public MockEmitter() {
-    }
-
-    public static List<EmitData> getData() {
-        return new ArrayList<>(EMIT_DATA);
-    }
-
-    @Override
-    public void emit(String emitKey, List<Metadata> metadataList, ParseContext 
parseContext)
-            throws IOException, TikaEmitterException {
-        emit(
-                Collections.singletonList(new EmitData(new EmitKey(getName(), 
emitKey),
-                        metadataList, null, parseContext)));
-    }
-
-    @Override
-    public void emit(List<? extends EmitData> emitData) throws IOException, 
TikaEmitterException {
-        int inserted = 0;
-        for (EmitData d : emitData) {
-            EMIT_DATA.offer(d);
-        }
-    }
-
-}
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockFetcher.java
 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockFetcher.java
index 2b869374c..a6f05cbbd 100644
--- 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockFetcher.java
+++ 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/async/MockFetcher.java
@@ -26,7 +26,7 @@ import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.fetcher.AbstractFetcher;
-import org.apache.tika.pipes.api.fetcher.FetcherConfig;
+import org.apache.tika.plugins.PluginConfig;
 
 public class MockFetcher extends AbstractFetcher {
 
@@ -39,7 +39,7 @@ public class MockFetcher extends AbstractFetcher {
     }
 
     @Override
-    public void configure(FetcherConfig fetcherConfig) throws 
TikaConfigException, IOException {
+    public void configure(PluginConfig fetcherConfig) throws 
TikaConfigException, IOException {
         //no-op
     }
 
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java
 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java
index 6d32ea2c4..86fa68250 100644
--- 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java
+++ 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java
@@ -17,44 +17,80 @@
 package org.apache.tika.pipes.core.emitter;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.pf4j.Extension;
 
-import org.apache.tika.config.Field;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.config.Param;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.api.emitter.EmitData;
+import org.apache.tika.pipes.api.emitter.Emitter;
+import org.apache.tika.plugins.PluginConfig;
 
-public class MockEmitter extends AbstractEmitter implements Initializable {
+@Extension
+public class MockEmitter implements Initializable, Emitter {
 
-    @Field
-    private boolean throwOnCheck = false;
+    public static ArrayBlockingQueue<EmitData> EMIT_DATA = new 
ArrayBlockingQueue<>(10000);
 
-    @Override
-    public void initialize(Map<String, Param> params) throws 
TikaConfigException {
+    public static List<EmitData> getData() {
+        return new ArrayList<>(EMIT_DATA);
+    }
+
+    public MockEmitter() throws IOException {
+    }
+
+    private static record MockEmitterConfig(boolean throwOnCheck) {
 
     }
 
-    public void setThrowOnCheck(boolean throwOnCheck) {
-        this.throwOnCheck = throwOnCheck;
+    private MockEmitterConfig config = new MockEmitterConfig(true);
+
+    @Override
+    public void initialize(Map<String, Param> params) throws 
TikaConfigException {
+        //no-op
     }
 
     @Override
     public void checkInitialization(InitializableProblemHandler problemHandler)
             throws TikaConfigException {
 
-        if (throwOnCheck) {
+        if (config.throwOnCheck()) {
             throw new TikaConfigException("throw on check");
         }
 
     }
 
+    @Override
+    public void configure(PluginConfig pluginConfig) throws 
TikaConfigException, IOException {
+        config = new ObjectMapper().readValue(pluginConfig.jsonConfig(), 
MockEmitterConfig.class);
+    }
+
+    @Override
+    public String getPluginId() {
+        return "mock-emitter";
+    }
+
     @Override
     public void emit(String emitKey, List<Metadata> metadataList, ParseContext 
parseContext)
             throws IOException, TikaEmitterException {
+        emit(
+                Collections.singletonList(new EmitDataImpl(emitKey,
+                        metadataList, null, parseContext)));
+    }
+    @Override
+    public void emit(List<? extends EmitData> emitData) throws IOException, 
TikaEmitterException {
 
+        for (EmitData d : emitData) {
+            EMIT_DATA.offer(d);
+        }
     }
 }
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/fetcher/MockFetcher.java
 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/fetcher/MockFetcher.java
index 68955d69b..9741d35d5 100644
--- 
a/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/fetcher/MockFetcher.java
+++ 
b/tika-pipes/tika-pipes-core-tests/src/test/java/org/apache/tika/pipes/core/fetcher/MockFetcher.java
@@ -29,7 +29,7 @@ import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.fetcher.AbstractFetcher;
-import org.apache.tika.pipes.api.fetcher.FetcherConfig;
+import org.apache.tika.plugins.PluginConfig;
 
 public class MockFetcher extends AbstractFetcher {
 
@@ -55,7 +55,7 @@ public class MockFetcher extends AbstractFetcher {
     }
 
     @Override
-    public void configure(FetcherConfig fetcherConfig) throws 
TikaConfigException, IOException {
+    public void configure(PluginConfig fetcherConfig) throws 
TikaConfigException, IOException {
         //no-op
     }
 
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers-emitters.json
 
b/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers-emitters.json
new file mode 100644
index 000000000..3e2409761
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers-emitters.json
@@ -0,0 +1,16 @@
+{
+  "plugins" : {
+    "fetchers": {
+      "file-system-fetcher": {
+        "basePath": "FETCHERS_BASE_PATH",
+        "extractFileSystemMetadata": false
+      }
+    },
+    "emitters": {
+      "file-system-emitter": {
+        "basePath": "EMITTERS_BASE_PATH",
+        "fileExtension": "json"
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers.json 
b/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers.json
deleted file mode 100644
index 5a46c0444..000000000
--- a/tika-pipes/tika-pipes-core-tests/src/test/resources/configs/fetchers.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{
-  "pipesPluginsConfig" : {
-    "fetchers": {
-      "file-system-fetcher": {
-        "basePath": "BASE_PATH",
-        "extractFileSystemMetadata": false
-      }
-    }
-  }
-}
\ No newline at end of file
diff --git 
a/tika-pipes/tika-pipes-core-tests/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml
 
b/tika-pipes/tika-pipes-core-tests/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml
index 58e168233..69d72bf02 100644
--- 
a/tika-pipes/tika-pipes-core-tests/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml
+++ 
b/tika-pipes/tika-pipes-core-tests/src/test/resources/org/apache/tika/pipes/core/tika-emit-config.xml
@@ -32,16 +32,4 @@
       <skipContainerDocument>false</skipContainerDocument>
     </digesterFactory>
   </autoDetectParserConfig>
-<!--  <fetchers>
-    <fetcher class="org.apache.tika.pipes.fetcher.fs.FileSystemFetcher">
-      <name>fs</name>
-      <basePath>src/test/resources/test-documents</basePath>
-    </fetcher>
-  </fetchers> -->
-  <emitters>
-    <emitter class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter">
-      <name>fs</name>
-      <basePath>EMITTER_BASE_PATH</basePath>
-    </emitter>
-  </emitters>
 </properties>
\ No newline at end of file
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
index 11fffa7fb..cb64a4f09 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesClient.java
@@ -51,7 +51,7 @@ import org.apache.tika.config.TikaTaskTimeout;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.metadata.TikaCoreProperties;
 import org.apache.tika.parser.ParseContext;
-import org.apache.tika.pipes.core.emitter.EmitData;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
 import org.apache.tika.pipes.core.emitter.EmitKey;
 import org.apache.tika.utils.ProcessUtils;
 import org.apache.tika.utils.StringUtils;
@@ -376,7 +376,7 @@ public class PipesClient implements Closeable {
         input.readFully(bytes);
         try (ObjectInputStream objectInputStream = new ObjectInputStream(
                 
UnsynchronizedByteArrayInputStream.builder().setByteArray(bytes).get())) {
-            EmitData emitData = (EmitData) objectInputStream.readObject();
+            EmitDataImpl emitData = (EmitDataImpl) 
objectInputStream.readObject();
 
             String stack = emitData.getContainerStackTrace();
             if (StringUtils.isBlank(stack)) {
@@ -405,7 +405,7 @@ public class PipesClient implements Closeable {
         try (ObjectInputStream objectInputStream = new ObjectInputStream(
                 
UnsynchronizedByteArrayInputStream.builder().setByteArray(bytes).get())) {
             Metadata metadata = (Metadata) objectInputStream.readObject();
-            EmitData emitDataTuple = new EmitData(emitKey, 
Collections.singletonList(metadata));
+            EmitDataImpl emitDataTuple = new 
EmitDataImpl(emitKey.getEmitKey(), Collections.singletonList(metadata));
             return new PipesResult(PipesResult.STATUS.INTERMEDIATE_RESULT, 
emitDataTuple, true);
         } catch (ClassNotFoundException e) {
             LOG.error("class not found exception deserializing data", e);
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java
index 3ef3a9ec0..6fa63b7a6 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesPluginsConfig.java
@@ -23,69 +23,102 @@ import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Optional;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
-import org.apache.tika.pipes.api.emitter.EmitterConfig;
-import org.apache.tika.pipes.api.fetcher.FetcherConfig;
-import org.apache.tika.pipes.core.fetcher.config.DefaultFetcherConfig;
+import org.apache.tika.plugins.PluginConfig;
+import org.apache.tika.plugins.PluginConfigs;
+
 
 public class PipesPluginsConfig {
 
     public static PipesPluginsConfig load(InputStream is) throws IOException {
         JsonNode root = new ObjectMapper().readTree(new BufferedReader(new 
InputStreamReader(is, StandardCharsets.UTF_8)));
-        JsonNode plugins = root.get("pipesPluginsConfig");
-        Map<String, FetcherConfig> fetcherMap = new HashMap<>();
+        PluginConfigs fetchers = null;
+        PluginConfigs emitters = null;
+        PluginConfigs iterators = null;
+        PluginConfigs reporters = null;
+
+        JsonNode plugins = root.get("plugins");
         if (plugins.has("fetchers")) {
-            JsonNode fetchers = plugins.get("fetchers");
-            Iterator<String> it = fetchers.fieldNames();
-            while (it.hasNext()) {
-                String pluginId = it.next();
-                JsonNode fetcherConfig = fetchers.get(pluginId);
-                fetcherMap.put(pluginId, new DefaultFetcherConfig(pluginId, 
fetcherConfig.toString()));
-            }
+            fetchers = load(plugins.get("fetchers"));
         }
-        Map<String, FetcherConfig> emitterMap = new HashMap<>();
         if (plugins.has("emitters")) {
-            JsonNode emitters = plugins.get("emitters");
-            Iterator<String> it = emitters.fieldNames();
-            while (it.hasNext()) {
-                String pluginId = it.next();
-                JsonNode emitterConfig = emitters.get(pluginId);
-                emitterMap.put(pluginId, new EmitterConfigImpl(pluginId, 
emitterConfig.toString()));
-            }
+            emitters = load(plugins.get("emitters"));
+        }
+        if (plugins.has("iterators")) {
+            iterators = load(plugins.get("iterators"));
+        }
+        if (plugins.has("reporters")) {
+            reporters = load(plugins.get("reporters"));
         }
+
         Path pluginsDir = null;
         if (plugins.has("pf4j.pluginsDir")) {
             pluginsDir = Paths.get(plugins.get("pf4j.pluginsDir").asText());
         }
-        return new PipesPluginsConfig(fetcherMap, emitterMap, pluginsDir);
+        return new PipesPluginsConfig(fetchers, emitters, iterators, 
reporters, pluginsDir);
+    }
+
+    private static PluginConfigs load(JsonNode pluginsNode) {
+        PluginConfigs manager = new PluginConfigs();
+        Iterator<String> it = pluginsNode.fieldNames();
+        manager = new PluginConfigs();
+        while (it.hasNext()) {
+            String pluginId = it.next();
+            JsonNode jsonConfig = pluginsNode.get(pluginId);
+            manager.add(new PluginConfig(pluginId, jsonConfig.toString()));
+        }
+        return manager;
     }
 
-    private final Map<String, FetcherConfig> fetcherMap;
-    private final Map<String, EmitterConfig> emitterMap;
+    private final PluginConfigs fetchers;
+    private final PluginConfigs emitters;
+    private final PluginConfigs iterators;
+    private final PluginConfigs reporters;
 
 
     private final Path pluginsDir;
-    private PipesPluginsConfig(Map<String, FetcherConfig> fetcherMap, 
Map<String, EmitterConfig> emitterMap, Path pluginsDir) {
-        this.fetcherMap = fetcherMap;
-        this.emitterMap = emitterMap;
+
+    public PipesPluginsConfig(PluginConfigs fetchers, PluginConfigs emitters,
+                              PluginConfigs iterators, PluginConfigs 
reporters, Path pluginsDir) {
+        this.fetchers = fetchers;
+        this.emitters = emitters;
+        this.iterators = iterators;
+        this.reporters = reporters;
         this.pluginsDir = pluginsDir;
     }
 
-    public Optional<FetcherConfig> getFetcherConfig(String pluginId) {
-        return Optional.ofNullable(fetcherMap.get(pluginId));
+    public Optional<PluginConfig> getFetcherConfig(String pluginId) {
+        if (fetchers == null) {
+            throw new IllegalArgumentException("fetchers element was not 
loaded");
+        }
+        return fetchers.get(pluginId);
+    }
+
+    public Optional<PluginConfig> getEmitterConfig(String pluginId) {
+        if (emitters == null) {
+            throw new IllegalArgumentException("emitters element was not 
loaded");
+        }
+        return emitters.get(pluginId);
     }
 
-    public Optional<EmitterConfig> getEmitterConfig(String pluginId) {
-        return Optional.ofNullable(emitterMap.get(pluginId));
+    public Optional<PluginConfig> getIteratorConfig(String pluginId) {
+        if (iterators == null) {
+            throw new IllegalArgumentException("iterators element was not 
loaded");
+        }
+        return iterators.get(pluginId);
     }
 
+    public Optional<PluginConfig> getReporterConfig(String pluginId) {
+        if (reporters == null) {
+            throw new IllegalArgumentException("reporters element was not 
loaded");
+        }
+        return reporters.get(pluginId);
+    }
 
     public Optional<Path> getPluginsDir() {
         return Optional.ofNullable(pluginsDir);
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
index 3f391ae28..d9ad9f0be 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesResult.java
@@ -16,7 +16,7 @@
  */
 package org.apache.tika.pipes.core;
 
-import org.apache.tika.pipes.core.emitter.EmitData;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
 
 public class PipesResult {
 
@@ -49,10 +49,10 @@ public class PipesResult {
     public static final PipesResult EMPTY_OUTPUT =
             new PipesResult(STATUS.EMPTY_OUTPUT);
     private final STATUS status;
-    private final EmitData emitDataTuple;
+    private final EmitDataImpl emitDataTuple;
     private final String message;
 
-    public PipesResult(STATUS status, EmitData emitDataTuple, String message, 
boolean intermediate) {
+    public PipesResult(STATUS status, EmitDataImpl emitDataTuple, String 
message, boolean intermediate) {
         this.status = status;
         this.emitDataTuple = emitDataTuple;
         this.message = message;
@@ -72,11 +72,11 @@ public class PipesResult {
      *
      * @param emitDataTuple
      */
-    public PipesResult(EmitData emitDataTuple) {
+    public PipesResult(EmitDataImpl emitDataTuple) {
         this(STATUS.PARSE_SUCCESS, emitDataTuple, null, false);
     }
 
-    public PipesResult(STATUS status, EmitData emitDataTuple, boolean 
intermediate) {
+    public PipesResult(STATUS status, EmitDataImpl emitDataTuple, boolean 
intermediate) {
         this(status, emitDataTuple, null, intermediate);
     }
 
@@ -87,7 +87,7 @@ public class PipesResult {
      * @param emitDataTuple
      * @param message
      */
-    public PipesResult(EmitData emitDataTuple, String message) {
+    public PipesResult(EmitDataImpl emitDataTuple, String message) {
         this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitDataTuple, message, 
false);
     }
 
@@ -95,7 +95,7 @@ public class PipesResult {
         return status;
     }
 
-    public EmitData getEmitData() {
+    public EmitDataImpl getEmitData() {
         return emitDataTuple;
     }
 
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
index b7429b9a5..ff520877b 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/PipesServer.java
@@ -69,10 +69,9 @@ import org.apache.tika.parser.RecursiveParserWrapper;
 import org.apache.tika.pipes.api.emitter.Emitter;
 import org.apache.tika.pipes.api.emitter.StreamEmitter;
 import org.apache.tika.pipes.api.fetcher.Fetcher;
-import org.apache.tika.pipes.core.emitter.EmitData;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
 import org.apache.tika.pipes.core.emitter.EmitKey;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
-import org.apache.tika.pipes.core.emitter.TikaEmitterException;
 import org.apache.tika.pipes.core.extractor.BasicEmbeddedDocumentBytesHandler;
 import org.apache.tika.pipes.core.extractor.EmbeddedDocumentBytesConfig;
 import 
org.apache.tika.pipes.core.extractor.EmittingEmbeddedDocumentBytesHandler;
@@ -309,7 +308,7 @@ public class PipesServer implements Runnable {
         try {
             emitter = emitterManager.getEmitter(emitKey.getEmitterPluginId());
         } catch (IllegalArgumentException e) {
-            String noEmitterMsg = getNoEmitterMsg(taskId);
+            String noEmitterMsg = getNoEmitterMsg(taskId, 
emitKey.getEmitterPluginId());
             LOG.warn(noEmitterMsg);
             write(STATUS.EMITTER_NOT_FOUND, noEmitterMsg);
             return;
@@ -354,7 +353,7 @@ public class PipesServer implements Runnable {
             exit(1);
         }
 
-        EmitData filteredEmitDataTuple = new EmitData(emitKey, filtered, 
parseExceptionStack);
+        EmitDataImpl filteredEmitDataTuple = new 
EmitDataImpl(emitKey.getEmitKey(), filtered, parseExceptionStack);
 
         try {
             UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
@@ -490,7 +489,7 @@ public class PipesServer implements Runnable {
                 emitKey = new EmitKey(emitKey.getEmitterPluginId(), 
t.getFetchKey().getFetchKey());
                 t.setEmitKey(emitKey);
             }
-            EmitData emitDataTuple = new EmitData(t.getEmitKey(), 
parseData.getMetadataList(), stack);
+            EmitDataImpl emitDataTuple = new 
EmitDataImpl(t.getEmitKey().getEmitKey(), parseData.getMetadataList(), stack);
             if (shouldEmit(embeddedDocumentBytesConfig, parseData, 
emitDataTuple)) {
                 emit(t.getId(), emitKey, 
embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes(),
                         parseData, stack, parseContext);
@@ -506,7 +505,7 @@ public class PipesServer implements Runnable {
         }
     }
 
-    private boolean shouldEmit(EmbeddedDocumentBytesConfig 
embeddedDocumentBytesConfig, MetadataListAndEmbeddedBytes parseData, EmitData 
emitDataTuple) {
+    private boolean shouldEmit(EmbeddedDocumentBytesConfig 
embeddedDocumentBytesConfig, MetadataListAndEmbeddedBytes parseData, 
EmitDataImpl emitDataTuple) {
         if (emitStrategy == EMIT_STRATEGY.EMIT_ALL) {
             return true;
         } else if 
(embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes() &&
@@ -597,9 +596,9 @@ public class PipesServer implements Runnable {
         return sb.toString();
     }
 
-    private String getNoEmitterMsg(String emitterName) {
+    private String getNoEmitterMsg(String taskName, String emitterName) {
         StringBuilder sb = new StringBuilder();
-        sb.append("Emitter '").append(emitterName).append("'");
+        sb.append("Emitter for task='").append(taskName).append("' 
emitter='").append(emitterName).append("'");
         sb.append(" not found.");
         sb.append("\nThe configured emitterManager supports:");
         int i = 0;
@@ -853,7 +852,7 @@ public class PipesServer implements Runnable {
         //skip initialization of the emitters if emitting
         //from the pipesserver is turned off.
         if (maxForEmitBatchBytes > -1) {
-            this.emitterManager = EmitterManager.load(tikaConfigPath);
+            this.emitterManager = EmitterManager.load(pipesConfigPath);
         } else {
             LOG.debug("'maxForEmitBatchBytes' < 0. Not initializing emitters 
in PipesServer");
             this.emitterManager = null;
@@ -893,7 +892,7 @@ public class PipesServer implements Runnable {
         }
     }
 
-    private void write(EmitData emitData) {
+    private void write(EmitDataImpl emitData) {
         try {
             UnsynchronizedByteArrayOutputStream bos = 
UnsynchronizedByteArrayOutputStream.builder().get();
             try (ObjectOutputStream objectOutputStream = new 
ObjectOutputStream(bos)) {
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
index 63e1f8fc3..a985f6ee4 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncEmitter.java
@@ -30,10 +30,10 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.tika.pipes.api.emitter.EmitData;
 import org.apache.tika.pipes.api.emitter.Emitter;
-import org.apache.tika.pipes.core.emitter.EmitData;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
-import org.apache.tika.pipes.core.emitter.TikaEmitterException;
 import org.apache.tika.utils.ExceptionUtils;
 
 /**
@@ -42,18 +42,18 @@ import org.apache.tika.utils.ExceptionUtils;
  */
 public class AsyncEmitter implements Callable<Integer> {
 
-    static final EmitData EMIT_DATA_STOP_SEMAPHORE = new EmitData(null, null, 
null);
+    static final EmitDataPair EMIT_DATA_STOP_SEMAPHORE = new 
EmitDataPair(null, null);
     static final int EMITTER_FUTURE_CODE = 2;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncEmitter.class);
 
     private final AsyncConfig asyncConfig;
     private final EmitterManager emitterManager;
-    private final ArrayBlockingQueue<EmitData> emitDataQueue;
+    private final ArrayBlockingQueue<EmitDataPair> emitDataQueue;
 
     Instant lastEmitted = Instant.now();
 
-    public AsyncEmitter(AsyncConfig asyncConfig, ArrayBlockingQueue<EmitData> 
emitData,
+    public AsyncEmitter(AsyncConfig asyncConfig, 
ArrayBlockingQueue<EmitDataPair> emitData,
                         EmitterManager emitterManager) {
         this.asyncConfig = asyncConfig;
         this.emitDataQueue = emitData;
@@ -65,14 +65,14 @@ public class AsyncEmitter implements Callable<Integer> {
         EmitDataCache cache = new 
EmitDataCache(asyncConfig.getEmitMaxEstimatedBytes());
 
         while (true) {
-            EmitData emitData = emitDataQueue.poll(500, TimeUnit.MILLISECONDS);
-            if (emitData == EMIT_DATA_STOP_SEMAPHORE) {
+            EmitDataPair emitDataPair = emitDataQueue.poll(500, 
TimeUnit.MILLISECONDS);
+            if (emitDataPair == EMIT_DATA_STOP_SEMAPHORE) {
                 cache.emitAll();
                 return EMITTER_FUTURE_CODE;
             }
-            if (emitData != null) {
+            if (emitDataPair != null) {
                 //this can block on emitAll
-                cache.add(emitData);
+                cache.add(emitDataPair);
             } else {
                 LOG.trace("Nothing on the async queue");
             }
@@ -102,17 +102,17 @@ public class AsyncEmitter implements Callable<Integer> {
             estimatedSize += newBytes;
         }
 
-        void add(EmitData data) {
+        void add(EmitDataPair emitDataPair) {
             size++;
-            long sz = data.getEstimatedSizeBytes();
+            long sz = emitDataPair.emitData().getEstimatedSizeBytes();
             if (estimatedSize + sz > maxBytes) {
                 LOG.debug("estimated size ({}) > maxBytes({}), going to 
emitAll",
                         (estimatedSize + sz), maxBytes);
                 emitAll();
             }
-            List<EmitData> cached = 
map.computeIfAbsent(data.getEmitKey().getEmitterPluginId(), k -> new 
ArrayList<>());
+            List<EmitData> cached = 
map.computeIfAbsent(emitDataPair.emitterPluginId(), k -> new ArrayList<>());
             updateEstimatedSize(sz);
-            cached.add(data);
+            cached.add(emitDataPair.emitData());
         }
 
         private void emitAll() {
@@ -131,10 +131,10 @@ public class AsyncEmitter implements Callable<Integer> {
             lastEmitted = Instant.now();
         }
 
-        private void tryToEmit(Emitter emitter, List<EmitData> 
cachedEmitDatumTuples) {
+        private void tryToEmit(Emitter emitter, List<? extends EmitData> 
emitData) {
 
             try {
-                emitter.emit(cachedEmitDatumTuples);
+                emitter.emit(emitData);
             } catch (IOException e) {
                 LOG.warn("emitter class ({}): {}", emitter.getClass(),
                         ExceptionUtils.getStackTrace(e));
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
index e57be5621..2c15d05ef 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/AsyncProcessor.java
@@ -39,7 +39,7 @@ import org.apache.tika.pipes.core.PipesClient;
 import org.apache.tika.pipes.core.PipesException;
 import org.apache.tika.pipes.core.PipesReporter;
 import org.apache.tika.pipes.core.PipesResult;
-import org.apache.tika.pipes.core.emitter.EmitData;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
 import org.apache.tika.pipes.core.pipesiterator.PipesIterator;
 import org.apache.tika.pipes.core.pipesiterator.TotalCountResult;
@@ -58,7 +58,7 @@ public class AsyncProcessor implements Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncProcessor.class);
 
     private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
-    private final ArrayBlockingQueue<EmitData> emitDatumTuples;
+    private final ArrayBlockingQueue<EmitDataPair> emitDatumTuples;
     private final ExecutorCompletionService<Integer> executorCompletionService;
     private final ExecutorService executorService;
     private final AsyncConfig asyncConfig;
@@ -110,7 +110,7 @@ public class AsyncProcessor implements Closeable {
                         new FetchEmitWorker(asyncConfig, fetchEmitTuples, 
emitDatumTuples));
             }
 
-            EmitterManager emitterManager = 
EmitterManager.load(asyncConfig.getTikaConfig());
+            EmitterManager emitterManager = 
EmitterManager.load(asyncConfig.getPipesPluginsConfig());
             for (int i = 0; i < asyncConfig.getNumEmitters(); i++) {
                 executorCompletionService.submit(
                         new AsyncEmitter(asyncConfig, emitDatumTuples, 
emitterManager));
@@ -262,11 +262,11 @@ public class AsyncProcessor implements Closeable {
 
         private final AsyncConfig asyncConfig;
         private final ArrayBlockingQueue<FetchEmitTuple> fetchEmitTuples;
-        private final ArrayBlockingQueue<EmitData> emitDataTupleQueue;
+        private final ArrayBlockingQueue<EmitDataPair> emitDataTupleQueue;
 
         private FetchEmitWorker(AsyncConfig asyncConfig,
                                 ArrayBlockingQueue<FetchEmitTuple> 
fetchEmitTuples,
-                                ArrayBlockingQueue<EmitData> 
emitDataTupleQueue) {
+                                ArrayBlockingQueue<EmitDataPair> 
emitDataTupleQueue) {
             this.asyncConfig = asyncConfig;
             this.fetchEmitTuples = fetchEmitTuples;
             this.emitDataTupleQueue = emitDataTupleQueue;
@@ -305,8 +305,8 @@ public class AsyncProcessor implements Closeable {
 
                         if (shouldEmit(result)) {
                             LOG.trace("adding result to emitter queue: " + 
result.getEmitData());
-                            boolean offered = 
emitDataTupleQueue.offer(result.getEmitData(),
-                                    MAX_OFFER_WAIT_MS,
+                            boolean offered = emitDataTupleQueue.offer(
+                                    new 
EmitDataPair(t.getEmitKey().getEmitterPluginId(), result.getEmitData()), 
MAX_OFFER_WAIT_MS,
                                     TimeUnit.MILLISECONDS);
                             if (! offered) {
                                 throw new RuntimeException("Couldn't offer 
emit data to queue " +
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/EmitDataPair.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/EmitDataPair.java
new file mode 100644
index 000000000..e51152058
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/async/EmitDataPair.java
@@ -0,0 +1,6 @@
+package org.apache.tika.pipes.core.async;
+
+import org.apache.tika.pipes.api.emitter.EmitData;
+
+public record EmitDataPair(String emitterPluginId, EmitData emitData) {
+}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitData.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
similarity index 83%
rename from 
tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitData.java
rename to 
tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
index 4c9996d12..1aee991f1 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitData.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitDataImpl.java
@@ -21,28 +21,29 @@ import java.util.List;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.api.emitter.EmitData;
 import org.apache.tika.utils.StringUtils;
 
-public class EmitData implements Serializable {
+public class EmitDataImpl implements Serializable, EmitData {
     /**
      * Serial version UID
      */
     private static final long serialVersionUID = -3861669115439125268L;
 
-    private final EmitKey emitKey;
+    private final String emitKey;
     private final List<Metadata> metadataList;
     private final String containerStackTrace;
     private ParseContext parseContext = null;
 
-    public EmitData(EmitKey emitKey, List<Metadata> metadataList) {
+    public EmitDataImpl(String emitKey, List<Metadata> metadataList) {
         this(emitKey, metadataList, StringUtils.EMPTY);
     }
 
-    public EmitData(EmitKey emitKey, List<Metadata> metadataList, String 
containerStackTrace) {
+    public EmitDataImpl(String emitKey, List<Metadata> metadataList, String 
containerStackTrace) {
         this(emitKey, metadataList, containerStackTrace, new ParseContext());
     }
 
-    public EmitData(EmitKey emitKey, List<Metadata> metadataList, String 
containerStackTrace, ParseContext parseContext) {
+    public EmitDataImpl(String emitKey, List<Metadata> metadataList, String 
containerStackTrace, ParseContext parseContext) {
         this.emitKey = emitKey;
         this.metadataList = metadataList;
         this.containerStackTrace = (containerStackTrace == null) ? 
StringUtils.EMPTY :
@@ -50,7 +51,7 @@ public class EmitData implements Serializable {
         this.parseContext = parseContext;
     }
 
-    public EmitKey getEmitKey() {
+    public String getEmitKey() {
         return emitKey;
     }
 
@@ -63,7 +64,7 @@ public class EmitData implements Serializable {
     }
 
     public long getEstimatedSizeBytes() {
-        return estimateSizeInBytes(getEmitKey().getEmitKey(), 
getMetadataList(), containerStackTrace);
+        return estimateSizeInBytes(getEmitKey(), getMetadataList(), 
containerStackTrace);
     }
 
     public void setParseContext(ParseContext parseContext) {
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
index 4dcfcc8fa..308c56bc2 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmitterManager.java
@@ -21,7 +21,6 @@ import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -29,14 +28,16 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.pf4j.DefaultPluginManager;
 import org.pf4j.PluginManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.tika.config.ConfigBase;
 import org.apache.tika.config.Initializable;
 import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.pipes.api.emitter.Emitter;
-import org.apache.tika.pipes.api.emitter.EmitterConfig;
 import org.apache.tika.pipes.core.PipesPluginsConfig;
+import org.apache.tika.plugins.PluginConfig;
 
 /**
  * Utility class that will apply the appropriate emitter
@@ -44,7 +45,10 @@ import org.apache.tika.pipes.core.PipesPluginsConfig;
  * <p>
  * This does not allow multiple emitters supporting the same prefix.
  */
-public class EmitterManager extends ConfigBase {
+public class EmitterManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(EmitterManager.class);
+
 
     private final Map<String, Emitter> emitterMap = new ConcurrentHashMap<>();
 
@@ -66,7 +70,8 @@ public class EmitterManager extends ConfigBase {
         pluginManager.startPlugins();
         Map<String, Emitter> emitterMap = new HashMap<>();
         for (Emitter emitter : pluginManager.getExtensions(Emitter.class)) {
-            Optional<EmitterConfig> emitterConfig = 
pluginsConfig.getEmitterConfig(emitter.getPluginId());
+            LOG.warn("EMITTER PLUGIN ID: " + emitter.getPluginId() + " : " + 
emitter.getClass());
+            Optional<PluginConfig> emitterConfig = 
pluginsConfig.getEmitterConfig(emitter.getPluginId());
             if (emitterConfig.isPresent()) {
                 emitter.configure(emitterConfig.get());
                 if (emitter instanceof Initializable) {
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmptyEmitter.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmptyEmitter.java
index 6d9f03b7d..c3430ee98 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmptyEmitter.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/emitter/EmptyEmitter.java
@@ -19,24 +19,33 @@ package org.apache.tika.pipes.core.emitter;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.api.emitter.AbstractEmitter;
+import org.apache.tika.pipes.api.emitter.EmitData;
+import org.apache.tika.pipes.api.emitter.Emitter;
+import org.apache.tika.plugins.PluginConfig;
 
-public class EmptyEmitter implements Emitter {
+public class EmptyEmitter extends AbstractEmitter {
 
-    @Override
-    public String getName() {
-        return "empty";
+    public EmptyEmitter() throws IOException {
     }
 
     @Override
-    public void emit(String emitKey, List<Metadata> metadataList, ParseContext 
parseContext)
-            throws IOException, TikaEmitterException {
+    public void configure(PluginConfig pluginConfig) throws 
TikaConfigException, IOException {
+        //no-op
+    }
 
+    @Override
+    public String getPluginId() {
+        return "";
     }
 
     @Override
-    public void emit(List<? extends EmitData> emitData) throws IOException, 
TikaEmitterException {
+    public void emit(String emitKey, List<Metadata> metadataList, ParseContext 
parseContext)
+            throws IOException, TikaEmitterException {
 
     }
+
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
index ac9413583..d00d97662 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/extractor/EmittingEmbeddedDocumentBytesHandler.java
@@ -23,6 +23,8 @@ import java.io.InputStream;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.api.emitter.Emitter;
+import org.apache.tika.pipes.api.emitter.StreamEmitter;
 import org.apache.tika.pipes.core.FetchEmitTuple;
 import org.apache.tika.pipes.core.emitter.EmitKey;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/EmptyFetcher.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/EmptyFetcher.java
index 09467b946..a6c576ea6 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/EmptyFetcher.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/EmptyFetcher.java
@@ -24,12 +24,12 @@ import org.apache.tika.exception.TikaException;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.api.fetcher.Fetcher;
-import org.apache.tika.pipes.api.fetcher.FetcherConfig;
+import org.apache.tika.plugins.PluginConfig;
 
 public class EmptyFetcher implements Fetcher {
 
     @Override
-    public void configure(FetcherConfig fetcherConfig) throws 
TikaConfigException {
+    public void configure(PluginConfig pluginConfig) throws 
TikaConfigException {
         //no-op
     }
 
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
index 2f9d4e5c1..dc38202a4 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/fetcher/FetcherManager.java
@@ -36,8 +36,8 @@ import org.apache.tika.config.InitializableProblemHandler;
 import org.apache.tika.exception.TikaConfigException;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.pipes.api.fetcher.Fetcher;
-import org.apache.tika.pipes.api.fetcher.FetcherConfig;
 import org.apache.tika.pipes.core.PipesPluginsConfig;
+import org.apache.tika.plugins.PluginConfig;
 
 /**
  * Utility class to hold multiple fetchers.
@@ -66,7 +66,7 @@ public class FetcherManager {
         pluginManager.startPlugins();
         Map<String, Fetcher> fetcherMap = new HashMap<>();
         for (Fetcher fetcher : pluginManager.getExtensions(Fetcher.class)) {
-            Optional<FetcherConfig> fetcherConfig = 
pluginsConfig.getFetcherConfig(fetcher.getPluginId());
+            Optional<PluginConfig> fetcherConfig = 
pluginsConfig.getFetcherConfig(fetcher.getPluginId());
             if (fetcherConfig.isPresent()) {
                 fetcher.configure(fetcherConfig.get());
                 if (fetcher instanceof Initializable) {
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/pipesiterator/PipesIterator.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/pipesiterator/PipesIterator.java
index ef4473112..5bde0a20f 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/pipesiterator/PipesIterator.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/pipesiterator/PipesIterator.java
@@ -65,7 +65,7 @@ public abstract class PipesIterator extends ConfigBase
     private ArrayBlockingQueue<FetchEmitTuple> queue = null;
     private int queueSize = DEFAULT_QUEUE_SIZE;
     private String fetcherPluginId;
-    private String emitterName;
+    private String emitterPluginId;
     private FetchEmitTuple.ON_PARSE_EXCEPTION onParseException =
             FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT;
     private BasicContentHandlerFactory.HANDLER_TYPE handlerType =
@@ -89,22 +89,22 @@ public abstract class PipesIterator extends ConfigBase
         }
     }
 
-    public String getFetcherName() {
+    public String getFetcherPluginId() {
         return fetcherPluginId;
     }
 
     @Field
-    public void setFetcherName(String fetcherPluginId) {
+    public void setFetcherPluginId(String fetcherPluginId) {
         this.fetcherPluginId = fetcherPluginId;
     }
 
-    public String getEmitterName() {
-        return emitterName;
+    public String getEmitterPluginId() {
+        return emitterPluginId;
     }
 
     @Field
-    public void setEmitterName(String emitterName) {
-        this.emitterName = emitterName;
+    public void setEmitterPluginId(String emitterPluginId) {
+        this.emitterPluginId = emitterPluginId;
     }
 
     @Field
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java
index cf86bf80c..11b6df585 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/core/serialization/JsonEmitData.java
@@ -25,7 +25,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.core.FetchEmitTuple;
-import org.apache.tika.pipes.core.emitter.EmitData;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
 import org.apache.tika.serialization.MetadataSerializer;
 import org.apache.tika.serialization.ParseContextSerializer;
 
@@ -40,7 +40,7 @@ public class JsonEmitData {
         OBJECT_MAPPER.registerModule(module);
     }
 
-    public static void toJson(EmitData emitDataTuple, Writer writer) throws 
IOException {
+    public static void toJson(EmitDataImpl emitDataTuple, Writer writer) 
throws IOException {
         OBJECT_MAPPER.writeValue(writer, emitDataTuple);
     }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
deleted file mode 100644
index e126ef66e..000000000
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.emitter.fs;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.FileAlreadyExistsException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
-import java.util.List;
-
-import org.apache.tika.config.Field;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.parser.ParseContext;
-import org.apache.tika.pipes.core.emitter.TikaEmitterException;
-import org.apache.tika.serialization.JsonMetadataList;
-
-/**
- * Emitter to write to a file system.
- * <p>
- * This calculates the path to write to based on the {@link #basePath}
- * and the value of the {@link TikaCoreProperties#SOURCE_PATH} value.
- *
- * <pre class="prettyprint">
- *  &lt;properties&gt;
- *      &lt;emitters&gt;
- *          &lt;emitter 
class="org.apache.tika.pipes.emitter.fs.FileSystemEmitter&gt;
- *              &lt;params&gt;
- *                  &lt;!-- required --&gt;
- *                  &lt;param name="name" type="string"&gt;fs&lt;/param&gt;
- *                  &lt;!-- required --&gt;
- *                  &lt;param name="basePath" 
type="string"&gt;/path/to/output&lt;/param&gt;
- *                  &lt;!-- optional; default is 'json' --&gt;
- *                  &lt;param name="fileExtension" 
type="string"&gt;json&lt;/param&gt;
- *                  &lt;!-- optional; if the file already exists,
- *                       options ('skip', 'replace', 'exception')
- *                  default is 'exception' --&gt;
- *                  &lt;param name="onExists" 
type="string"&gt;skip&lt;/param&gt;
- *                  &lt;!-- optional; whether or not to pretty print the output
- *                      default is false --&gt;
- *                     &lt;param name="prettyPrint" 
type="boolean"&gt;true&lt;/param&gt;
- *              &lt;/params&gt;
- *          &lt;/emitter&gt;
- *      &lt;/emitters&gt;
- *  &lt;/properties&gt;</pre>
- */
-public class FileSystemEmitter extends AbstractEmitter implements 
StreamEmitter {
-
-    private Path basePath = null;
-    private String fileExtension = "json";
-    private ON_EXISTS onExists = ON_EXISTS.EXCEPTION;
-
-    private boolean prettyPrint = false;
-
-    @Override
-    public void emit(String emitKey, List<Metadata> metadataList, ParseContext 
parseContext) throws IOException, TikaEmitterException {
-        Path output;
-        if (metadataList == null || metadataList.isEmpty()) {
-            throw new TikaEmitterException("metadata list must not be null or 
of size 0");
-        }
-
-        if (fileExtension != null && ! fileExtension.isEmpty()) {
-            emitKey += "." + fileExtension;
-        }
-        if (basePath != null) {
-            output = basePath.resolve(emitKey);
-            if 
(!output.toAbsolutePath().normalize().startsWith(basePath.toAbsolutePath().normalize()))
 {
-                throw new TikaEmitterException("path traversal?! " + 
output.toAbsolutePath());
-            }
-        } else {
-            output = Paths.get(emitKey);
-        }
-
-        if (output.getParent() != null && 
!Files.isDirectory(output.getParent())) {
-            Files.createDirectories(output.getParent());
-        }
-        try (Writer writer = Files.newBufferedWriter(output, 
StandardCharsets.UTF_8)) {
-            JsonMetadataList.toJson(metadataList, writer, prettyPrint);
-        }
-    }
-
-    @Field
-    public void setBasePath(String basePath) {
-        this.basePath = Paths.get(basePath);
-    }
-
-    /**
-     * If you want to customize the output file's file extension.
-     * Do not include the "."
-     *
-     * @param fileExtension
-     */
-    @Field
-    public void setFileExtension(String fileExtension) {
-        this.fileExtension = fileExtension;
-    }
-
-    /**
-     * What to do if the target file already exists.  NOTE: if more than one
-     * thread is trying write to the same file and {@link ON_EXISTS#REPLACE} 
is chosen,
-     * you still might get a {@link FileAlreadyExistsException}.
-     *
-     * @param onExists
-     */
-    @Field
-    public void setOnExists(String onExists) {
-        switch (onExists) {
-            case "skip":
-                this.onExists = ON_EXISTS.SKIP;
-                break;
-            case "replace":
-                this.onExists = ON_EXISTS.REPLACE;
-                break;
-            case "exception":
-                this.onExists = ON_EXISTS.EXCEPTION;
-                break;
-            default:
-                throw new IllegalArgumentException("Don't understand '" + 
onExists + "'; must be one of: 'skip', 'replace', 'exception'");
-        }
-    }
-
-    @Field
-    public void setPrettyPrint(boolean prettyPrint) {
-        this.prettyPrint = prettyPrint;
-    }
-
-    @Override
-    public void emit(String path, InputStream inputStream, Metadata 
userMetadata, ParseContext parseContext) throws IOException, 
TikaEmitterException {
-        Path target = basePath.resolve(path);
-
-        if (!Files.isDirectory(target.getParent())) {
-            Files.createDirectories(target.getParent());
-        }
-        if (onExists == ON_EXISTS.REPLACE) {
-            Files.copy(inputStream, target, 
StandardCopyOption.REPLACE_EXISTING);
-        } else if (onExists == ON_EXISTS.EXCEPTION) {
-            Files.copy(inputStream, target);
-        } else if (onExists == ON_EXISTS.SKIP) {
-            if (!Files.isRegularFile(target)) {
-                try {
-                    Files.copy(inputStream, target);
-                } catch (FileAlreadyExistsException e) {
-                    //swallow
-                }
-            }
-        }
-    }
-
-    enum ON_EXISTS {
-        SKIP, EXCEPTION, REPLACE
-    }
-}
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
index 19199e1b0..3d97845fd 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/filelist/FileListPipesIterator.java
@@ -67,8 +67,8 @@ public class FileListPipesIterator extends PipesIterator 
implements Initializabl
             String line = reader.readLine();
             while (line != null) {
                 if (! line.startsWith("#") && !StringUtils.isBlank(line)) {
-                    FetchKey fetchKey = new FetchKey(getFetcherName(), line);
-                    EmitKey emitKey = new EmitKey(getEmitterName(), line);
+                    FetchKey fetchKey = new FetchKey(getFetcherPluginId(), 
line);
+                    EmitKey emitKey = new EmitKey(getEmitterPluginId(), line);
                     ParseContext parseContext = new ParseContext();
                     parseContext.set(HandlerConfig.class, getHandlerConfig());
                     tryToAdd(new FetchEmitTuple(line, fetchKey, emitKey,
@@ -95,8 +95,8 @@ public class FileListPipesIterator extends PipesIterator 
implements Initializabl
             throws TikaConfigException {
         //these should all be fatal
         TikaConfig.mustNotBeEmpty("fileList", fileList);
-        TikaConfig.mustNotBeEmpty("fetcherName", getFetcherName());
-        TikaConfig.mustNotBeEmpty("emitterName", getFetcherName());
+        TikaConfig.mustNotBeEmpty("fetcherPluginId", getFetcherPluginId());
+        TikaConfig.mustNotBeEmpty("emitterPluginId", getEmitterPluginId());
 
         fileListPath = Paths.get(fileList);
         if (!Files.isRegularFile(fileListPath)) {
diff --git 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
index a8a498808..69a75f97a 100644
--- 
a/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
+++ 
b/tika-pipes/tika-pipes-core/src/main/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIterator.java
@@ -79,7 +79,7 @@ public class FileSystemPipesIterator extends PipesIterator
         }
 
         try {
-            Files.walkFileTree(basePath, new FSFileVisitor(getFetcherName(), 
getEmitterName()));
+            Files.walkFileTree(basePath, new 
FSFileVisitor(getFetcherPluginId(), getEmitterPluginId()));
         } catch (IOException e) {
             Throwable cause = e.getCause();
             if (cause != null && cause instanceof TimeoutException) {
@@ -95,8 +95,8 @@ public class FileSystemPipesIterator extends PipesIterator
             throws TikaConfigException {
         //these should all be fatal
         TikaConfig.mustNotBeEmpty("basePath", basePath);
-        TikaConfig.mustNotBeEmpty("fetcherName", getFetcherName());
-        TikaConfig.mustNotBeEmpty("emitterName", getFetcherName());
+        TikaConfig.mustNotBeEmpty("fetcherPluginId", getFetcherPluginId());
+        TikaConfig.mustNotBeEmpty("emitterPluginId", getEmitterPluginId());
     }
 
     @Override
diff --git 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java
 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java
index f241b79d5..06e745503 100644
--- 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java
+++ 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/TikaPipesConfigTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
 
 import org.apache.tika.config.AbstractTikaConfigTest;
 import org.apache.tika.exception.TikaConfigException;
+import org.apache.tika.pipes.api.emitter.Emitter;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
 import org.apache.tika.pipes.core.pipesiterator.PipesIterator;
 
@@ -63,7 +64,7 @@ public class TikaPipesConfigTest extends 
AbstractTikaConfigTest {
 
         FetcherManager fetcherManager = FetcherManager.load(
                 getConfigFilePath("fetchers-nobasepath-config.xml"));
-    }*/
+    }
 
     @Test
     public void testEmitters() throws Exception {
@@ -81,12 +82,13 @@ public class TikaPipesConfigTest extends 
AbstractTikaConfigTest {
             
EmitterManager.load(getConfigFilePath("emitters-duplicate-config.xml"));
         });
     }
+    */
 
     @Test
     public void testPipesIterator() throws Exception {
         PipesIterator it =
                 
PipesIterator.build(getConfigFilePath("pipes-iterator-config.xml"));
-        assertEquals("fs1", it.getFetcherName());
+        assertEquals("fsf1", it.getFetcherPluginId());
     }
 
     @Test
@@ -94,7 +96,7 @@ public class TikaPipesConfigTest extends 
AbstractTikaConfigTest {
         assertThrows(TikaConfigException.class, () -> {
             PipesIterator it =
                     
PipesIterator.build(getConfigFilePath("pipes-iterator-multiple-config.xml"));
-            assertEquals("fs1", it.getFetcherName());
+            assertEquals("fsf1", it.getFetcherPluginId());
         });
     }
 }
diff --git 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java
 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java
deleted file mode 100644
index 6d32ea2c4..000000000
--- 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/emitter/MockEmitter.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.core.emitter;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.tika.config.Field;
-import org.apache.tika.config.Initializable;
-import org.apache.tika.config.InitializableProblemHandler;
-import org.apache.tika.config.Param;
-import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
-
-public class MockEmitter extends AbstractEmitter implements Initializable {
-
-    @Field
-    private boolean throwOnCheck = false;
-
-    @Override
-    public void initialize(Map<String, Param> params) throws 
TikaConfigException {
-
-    }
-
-    public void setThrowOnCheck(boolean throwOnCheck) {
-        this.throwOnCheck = throwOnCheck;
-    }
-
-    @Override
-    public void checkInitialization(InitializableProblemHandler problemHandler)
-            throws TikaConfigException {
-
-        if (throwOnCheck) {
-            throw new TikaConfigException("throw on check");
-        }
-
-    }
-
-    @Override
-    public void emit(String emitKey, List<Metadata> metadataList, ParseContext 
parseContext)
-            throws IOException, TikaEmitterException {
-
-    }
-}
diff --git 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java
 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java
index 42606b0d7..2cf38ef51 100644
--- 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java
+++ 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/core/pipesiterator/filelist/FileListPipesIteratorTest.java
@@ -36,8 +36,8 @@ public class FileListPipesIteratorTest {
     public void testBasic() throws Exception {
         Path p = 
Paths.get(this.getClass().getResource("/test-documents/file-list.txt").toURI());
         FileListPipesIterator it = new FileListPipesIterator();
-        it.setFetcherName("f");
-        it.setEmitterName("e");
+        it.setFetcherPluginId("f");
+        it.setEmitterPluginId("e");
         it.setFileList(p.toAbsolutePath().toString());
         it.setHasHeader(false);
         it.checkInitialization(InitializableProblemHandler.DEFAULT);
@@ -59,8 +59,8 @@ public class FileListPipesIteratorTest {
     public void testHasHeader() throws Exception {
         Path p = 
Paths.get(this.getClass().getResource("/test-documents/file-list.txt").toURI());
         FileListPipesIterator it = new FileListPipesIterator();
-        it.setFetcherName("f");
-        it.setEmitterName("e");
+        it.setFetcherPluginId("f");
+        it.setEmitterPluginId("e");
         it.setFileList(p.toAbsolutePath().toString());
         it.setHasHeader(true);
         it.checkInitialization(InitializableProblemHandler.DEFAULT);
diff --git 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIteratorTest.java
 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIteratorTest.java
index cfcb12318..9178aeb2b 100644
--- 
a/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIteratorTest.java
+++ 
b/tika-pipes/tika-pipes-core/src/test/java/org/apache/tika/pipes/pipesiterator/fs/FileSystemPipesIteratorTest.java
@@ -61,9 +61,9 @@ public class FileSystemPipesIteratorTest {
             truthSet.add(fetchString);
         }
 
-        String fetcherName = "fs";
+        String fetcherName = "file-system-fetcher";
         PipesIterator it = new FileSystemPipesIterator(root);
-        it.setFetcherName(fetcherName);
+        it.setFetcherPluginId(fetcherName);
         it.setQueueSize(2);
 
         Set<String> iteratorSet = new HashSet<>();
diff --git 
a/tika-pipes/tika-pipes-core/src/test/resources/configs/fetchers.json 
b/tika-pipes/tika-pipes-core/src/test/resources/configs/fetchers.json
index e153df966..b0e9e0ff0 100644
--- a/tika-pipes/tika-pipes-core/src/test/resources/configs/fetchers.json
+++ b/tika-pipes/tika-pipes-core/src/test/resources/configs/fetchers.json
@@ -1,5 +1,5 @@
 {
-  "pipesPluginsConfig" : {
+  "plugins" : {
     "fetchers": {
       "file-system-fetcher": {
         "basePath": "{BASE_PATH}",
diff --git 
a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml
 
b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml
index 50d006142..31f7f0a1a 100644
--- 
a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml
+++ 
b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-config.xml
@@ -18,7 +18,8 @@
 <properties>
   <pipesIterator 
class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator">
     <params>
-      <fetcherPluginId>fs1</fetcherPluginId>
+      <fetcherPluginId>fsf1</fetcherPluginId>
+      <emitterPluginId>fse1</emitterPluginId>
       <basePath>/my/base/path1</basePath>
     </params>
   </pipesIterator>
diff --git 
a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml
 
b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml
index e4e127d30..f7bed7010 100644
--- 
a/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml
+++ 
b/tika-pipes/tika-pipes-core/src/test/resources/org/apache/tika/config/pipes-iterator-multiple-config.xml
@@ -18,13 +18,15 @@
 <properties>
   <pipesIterator 
class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator">
     <params>
-      <fetcherPluginId>fs1</fetcherPluginId>
+      <fetcherPluginId>fsf1</fetcherPluginId>
+      <emitterPluginId>fse1</emitterPluginId>
       <basePath>/my/base/path1</basePath>
     </params>
   </pipesIterator>
   <pipesIterator 
class="org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator">
     <params>
-      <fetcherPluginId>fs2</fetcherPluginId>
+      <fetcherPluginId>fsf2</fetcherPluginId>
+      <emitterPluginId>fse2</emitterPluginId>
       <basePath>/my/base/path2</basePath>
     </params>
   </pipesIterator>
diff --git 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
index ed4ad524f..d059128af 100644
--- 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-az-blob/src/main/java/org/apache/tika/pipes/pipesiterator/azblob/AZBlobPipesIterator.java
@@ -88,7 +88,7 @@ public class AZBlobPipesIterator extends PipesIterator 
implements Initializable
     @Override
     protected void enqueue() throws InterruptedException, IOException, 
TimeoutException {
         String fetcherPluginId = getFetcherName();
-        String emitterName = getEmitterName();
+        String emitterName = getEmitterPluginId();
         long start = System.currentTimeMillis();
         int count = 0;
         HandlerConfig handlerConfig = getHandlerConfig();
diff --git 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
index 44663a22e..cdc600b2b 100644
--- 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/main/java/org/apache/tika/pipes/pipesiterator/csv/CSVPipesIterator.java
@@ -114,7 +114,7 @@ public class CSVPipesIterator extends PipesIterator 
implements Initializable {
     @Override
     protected void enqueue() throws InterruptedException, IOException, 
TimeoutException {
         String fetcherPluginId = getFetcherName();
-        String emitterName = getEmitterName();
+        String emitterName = getEmitterPluginId();
         try (Reader reader = Files.newBufferedReader(csvPath, charset)) {
             Iterable<CSVRecord> records = CSVFormat.EXCEL.parse(reader);
             List<String> headers = new ArrayList<>();
diff --git 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
index dccc1f70c..e04dfa866 100644
--- 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-csv/src/test/java/TestCSVPipesIterator.java
@@ -44,7 +44,7 @@ public class TestCSVPipesIterator {
         Path p = get("test-simple.csv");
         CSVPipesIterator it = new CSVPipesIterator();
         it.setFetcherName("fsf");
-        it.setEmitterName("fse");
+        it.setEmitterPluginId("fse");
         it.setCsvPath(p);
         it.setFetchKeyColumn("fetchKey");
         int numConsumers = 2;
diff --git 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java
index 857e2ac0a..93129f375 100644
--- 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-gcs/src/main/java/org/apache/tika/pipes/pipesiterator/gcs/GCSPipesIterator.java
@@ -93,7 +93,7 @@ public class GCSPipesIterator extends PipesIterator 
implements Initializable {
     @Override
     protected void enqueue() throws InterruptedException, IOException, 
TimeoutException {
         String fetcherPluginId = getFetcherName();
-        String emitterName = getEmitterName();
+        String emitterName = getEmitterPluginId();
         long start = System.currentTimeMillis();
         int count = 0;
         HandlerConfig handlerConfig = getHandlerConfig();
diff --git 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
index 39fc3fe45..c1cf0c998 100644
--- 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-jdbc/src/main/java/org/apache/tika/pipes/pipesiterator/jdbc/JDBCPipesIterator.java
@@ -141,7 +141,7 @@ public class JDBCPipesIterator extends PipesIterator 
implements Initializable {
     @Override
     protected void enqueue() throws InterruptedException, IOException, 
TimeoutException {
         String fetcherPluginId = getFetcherName();
-        String emitterName = getEmitterName();
+        String emitterName = getEmitterPluginId();
         FetchEmitKeyIndices fetchEmitKeyIndices = null;
         List<String> headers = new ArrayList<>();
         int rowCount = 0;
@@ -343,11 +343,11 @@ public class JDBCPipesIterator extends PipesIterator 
implements Initializable {
             throw new TikaConfigException("If you specify a 'fetchKeyColumn', 
you must specify a 'fetcherPluginId'");
         }
 
-        if (StringUtils.isBlank(getEmitterName()) && 
!StringUtils.isBlank(emitKeyColumn)) {
+        if (StringUtils.isBlank(getEmitterPluginId()) && 
!StringUtils.isBlank(emitKeyColumn)) {
             throw new TikaConfigException("If you specify an 'emitKeyColumn', 
you must specify an 'emitterName'");
         }
 
-        if (StringUtils.isBlank(getEmitterName()) && 
StringUtils.isBlank(getFetcherName())) {
+        if (StringUtils.isBlank(getEmitterPluginId()) && 
StringUtils.isBlank(getFetcherName())) {
             LOGGER.warn("no fetcher or emitter specified?!");
         }
 
diff --git 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
index 26474284a..1d95d3016 100644
--- 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-kafka/src/main/java/org/apache/tika/pipes/pipesiterator/kafka/KafkaPipesIterator.java
@@ -149,7 +149,7 @@ public class KafkaPipesIterator extends PipesIterator 
implements Initializable {
     @Override
     protected void enqueue() throws InterruptedException, TimeoutException {
         String fetcherPluginId = getFetcherName();
-        String emitterName = getEmitterName();
+        String emitterName = getEmitterPluginId();
         long start = System.currentTimeMillis();
         int count = 0;
         HandlerConfig handlerConfig = getHandlerConfig();
diff --git 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
index a35a559fe..f399925c2 100644
--- 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-s3/src/main/java/org/apache/tika/pipes/pipesiterator/s3/S3PipesIterator.java
@@ -197,7 +197,7 @@ public class S3PipesIterator extends PipesIterator 
implements Initializable {
     @Override
     protected void enqueue() throws InterruptedException, IOException, 
TimeoutException {
         String fetcherPluginId = getFetcherName();
-        String emitterName = getEmitterName();
+        String emitterName = getEmitterPluginId();
         long start = System.currentTimeMillis();
         int count = 0;
         HandlerConfig handlerConfig = getHandlerConfig();
diff --git 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
index a133e607d..956fa6604 100644
--- 
a/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
+++ 
b/tika-pipes/tika-pipes-iterators/tika-pipes-iterator-solr/src/main/java/org/apache/tika/pipes/pipesiterator/solr/SolrPipesIterator.java
@@ -175,7 +175,7 @@ public class SolrPipesIterator extends PipesIterator 
implements Initializable {
     @Override
     protected void enqueue() throws InterruptedException, IOException, 
TimeoutException {
         String fetcherPluginId = getFetcherName();
-        String emitterName = getEmitterName();
+        String emitterName = getEmitterPluginId();
 
         try (SolrClient solrClient = createSolrClient()) {
             int fileCount = 0;
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/serialization/PluginConfigLoader.java
 
b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginConfigLoader.java
new file mode 100644
index 000000000..00fd6f151
--- /dev/null
+++ 
b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginConfigLoader.java
@@ -0,0 +1,34 @@
+package org.apache.tika.serialization;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.apache.tika.plugins.PluginConfig;
+import org.apache.tika.plugins.PluginConfigs;
+
+public class PluginConfigLoader {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    static {
+        SimpleModule module = new SimpleModule();
+        module.addSerializer(PluginConfig.class, new 
PluginsConfigSerializer());
+        OBJECT_MAPPER.registerModule(module);
+        OBJECT_MAPPER.setVisibility(PropertyAccessor.FIELD, 
JsonAutoDetect.Visibility.ANY);
+    }
+
+    public static PluginConfigs load(InputStream is) throws IOException  {
+        try (Reader reader = new InputStreamReader(is, 
StandardCharsets.UTF_8)) {
+            return OBJECT_MAPPER.readValue(reader, PluginConfigs.class);
+        }
+    }
+
+}
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigDeserializer.java
 
b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigDeserializer.java
new file mode 100644
index 000000000..e773c01c9
--- /dev/null
+++ 
b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigDeserializer.java
@@ -0,0 +1,27 @@
+package org.apache.tika.serialization;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.tika.plugins.PluginConfig;
+
+public class PluginsConfigDeserializer extends JsonDeserializer<PluginConfig> {
+
+    @Override
+    public PluginConfig deserialize(JsonParser jsonParser, 
DeserializationContext deserializationContext) throws IOException, 
JacksonException {
+        JsonNode node = jsonParser.getCodec().readTree(jsonParser);
+
+        String pluginId = node.get("pluginId").asText();
+
+        JsonNode jsonConfigNode = node.get("jsonConfig");
+
+        String jsonConfigRaw = jsonConfigNode.toString();
+
+        return new PluginConfig(pluginId, jsonConfigRaw);
+    }
+}
diff --git 
a/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigSerializer.java
 
b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigSerializer.java
new file mode 100644
index 000000000..feadf4ead
--- /dev/null
+++ 
b/tika-serialization/src/main/java/org/apache/tika/serialization/PluginsConfigSerializer.java
@@ -0,0 +1,21 @@
+package org.apache.tika.serialization;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import org.apache.tika.plugins.PluginConfig;
+
+public class PluginsConfigSerializer extends JsonSerializer<PluginConfig> {
+
+    @Override
+    public void serialize(PluginConfig pluginsConfig, JsonGenerator 
jsonGenerator, SerializerProvider serializerProvider) throws IOException {
+        jsonGenerator.writeStartObject();
+        jsonGenerator.writeStringField("pluginId", pluginsConfig.pluginId());
+        jsonGenerator.writeFieldName("jsonConfig");
+        jsonGenerator.writeRawValue(pluginsConfig.jsonConfig());
+        jsonGenerator.writeEndObject();
+    }
+}
diff --git 
a/tika-serialization/src/test/java/org/apache/tika/serialization/PluginsConfigTest.java
 
b/tika-serialization/src/test/java/org/apache/tika/serialization/PluginsConfigTest.java
new file mode 100644
index 000000000..56808006a
--- /dev/null
+++ 
b/tika-serialization/src/test/java/org/apache/tika/serialization/PluginsConfigTest.java
@@ -0,0 +1,69 @@
+package org.apache.tika.serialization;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.junit.jupiter.api.Test;
+
+import org.apache.tika.plugins.PluginConfig;
+import org.apache.tika.plugins.PluginConfigs;
+
+public class PluginsConfigTest {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    static {
+        SimpleModule module = new SimpleModule();
+        module.addDeserializer(PluginConfig.class, new 
PluginsConfigDeserializer());
+        module.addSerializer(PluginConfig.class, new 
PluginsConfigSerializer());
+        OBJECT_MAPPER.registerModule(module);
+        OBJECT_MAPPER.setVisibility(PropertyAccessor.FIELD, 
JsonAutoDetect.Visibility.ANY);
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+
+        PluginConfig p1 = new PluginConfig("pluginId",
+                """
+                        {"basePath":"/my/docs","includeSystemInfo":true}
+                        """);
+        String json = OBJECT_MAPPER.writeValueAsString(p1);
+
+        PluginConfig deserialized = OBJECT_MAPPER.readValue(json, 
PluginConfig.class);
+        assertEquals(p1.pluginId(), deserialized.pluginId());
+        assertEquals(flatten(p1.jsonConfig()), 
flatten(deserialized.jsonConfig()));
+    }
+
+    @Test
+    public void testMap() throws Exception {
+        PluginConfig p1 = new PluginConfig("pluginId1",
+                """
+                        {"basePath":"/my/docs1","includeSystemInfo":true}
+                        """);
+        PluginConfig p2 = new PluginConfig("pluginId2",
+                """
+                        {"basePath":"/my/docs2","includeSystemInfo":false}
+                        """);
+        Map<String, PluginConfig> map = new HashMap<>();
+        map.put(p1.pluginId(), p1);
+        map.put(p2.pluginId(), p2);
+        PluginConfigs pluginConfigManager = new PluginConfigs(map);
+
+        String json = OBJECT_MAPPER.writeValueAsString(pluginConfigManager);
+
+        PluginConfigs deserialized = OBJECT_MAPPER.readValue(json, 
PluginConfigs.class);
+        assertEquals(pluginConfigManager.get(p1.pluginId()).get().pluginId(), 
deserialized.get(p1.pluginId()).get().pluginId());
+        
assertEquals(flatten(pluginConfigManager.get(p1.pluginId()).get().jsonConfig()),
+                flatten(deserialized.get(p1.pluginId()).get().jsonConfig()));
+    }
+
+    private static String flatten(String s) {
+        return s.replaceAll("[\r\n]", "");
+    }
+}
diff --git 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
index efc5120fc..1ce5ffe05 100644
--- 
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
+++ 
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/resource/AsyncResource.java
@@ -46,7 +46,7 @@ import org.apache.tika.parser.ParseContext;
 import org.apache.tika.pipes.core.FetchEmitTuple;
 import org.apache.tika.pipes.core.async.AsyncProcessor;
 import org.apache.tika.pipes.core.async.OfferLargerThanQueueSize;
-import org.apache.tika.pipes.core.emitter.EmitData;
+import org.apache.tika.pipes.core.emitter.EmitDataImpl;
 import org.apache.tika.pipes.core.emitter.EmitterManager;
 import org.apache.tika.pipes.core.extractor.EmbeddedDocumentBytesConfig;
 import org.apache.tika.pipes.core.fetcher.FetchKey;
@@ -73,7 +73,7 @@ public class AsyncResource {
         return queue;
     }
 
-    public ArrayBlockingQueue<EmitData> getEmitDataQueue(int size) {
+    public ArrayBlockingQueue<EmitDataImpl> getEmitDataQueue(int size) {
         return new ArrayBlockingQueue<>(size);
     }
 

Reply via email to