This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch TIKA-4260
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/TIKA-4260 by this push:
new a6ee3816a TIKA-4260 -- add parseContext to emitters WIP -- do not merge
a6ee3816a is described below
commit a6ee3816ad4481758cdf2fcf5fe3150f92c35388
Author: tallison <[email protected]>
AuthorDate: Fri May 24 08:58:25 2024 -0400
TIKA-4260 -- add parseContext to emitters WIP -- do not merge
---
.../java/org/apache/tika/pipes/PipesClient.java | 5 +-
.../java/org/apache/tika/pipes/PipesServer.java | 55 +++++++---------------
.../org/apache/tika/pipes/async/AsyncEmitter.java | 2 +-
.../apache/tika/pipes/emitter/AbstractEmitter.java | 6 ++-
.../org/apache/tika/pipes/emitter/EmitData.java | 16 ++++++-
.../org/apache/tika/pipes/emitter/Emitter.java | 3 +-
.../apache/tika/pipes/emitter/EmptyEmitter.java | 3 +-
.../apache/tika/pipes/emitter/StreamEmitter.java | 3 +-
.../EmittingEmbeddedDocumentBytesHandler.java | 17 +++++--
.../apache/tika/pipes/fetcher/EmptyFetcher.java | 5 +-
.../org/apache/tika/pipes/fetcher/Fetcher.java | 5 +-
.../apache/tika/pipes/fetcher/RangeFetcher.java | 35 --------------
.../tika/pipes/fetcher/fs/FileSystemFetcher.java | 7 +--
.../apache/tika/pipes/fetcher/url/UrlFetcher.java | 6 ++-
.../org/apache/tika/pipes/async/MockEmitter.java | 5 +-
.../org/apache/tika/pipes/async/MockFetcher.java | 5 +-
.../org/apache/tika/pipes/emitter/MockEmitter.java | 3 +-
.../org/apache/tika/pipes/fetcher/MockFetcher.java | 5 +-
.../tika/pipes/s3/tests/PipeIntegrationTests.java | 6 +--
.../tika/pipes/emitter/azblob/AZBlobEmitter.java | 5 +-
.../tika/pipes/emitter/fs/FileSystemEmitter.java | 5 +-
.../apache/tika/pipes/emitter/gcs/GCSEmitter.java | 3 +-
.../apache/tika/pipes/emitter/s3/S3Emitter.java | 7 +--
.../tika/pipes/fetcher/http/HttpFetcher.java | 26 ++++------
.../apache/tika/pipes/fetcher/s3/S3Fetcher.java | 13 +++--
.../tika/server/core/FetcherStreamFactory.java | 1 -
26 files changed, 110 insertions(+), 142 deletions(-)
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
index 52e72df85..129067136 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.utils.ProcessUtils;
@@ -325,7 +326,7 @@ public class PipesClient implements Closeable {
case INTERMEDIATE_RESULT:
LOG.debug("pipesClientId={} intermediate success: {} in {}
ms", pipesClientId,
t.getId(), millis);
- return deserializeIntermediateResult(t.getEmitKey());
+ return deserializeIntermediateResult(t.getEmitKey(),
t.getParseContext());
case PARSE_SUCCESS:
//there may have been a parse exception, but the parse didn't
crash
LOG.debug("pipesClientId={} parse success: {} in {} ms",
pipesClientId, t.getId(),
@@ -383,7 +384,7 @@ public class PipesClient implements Closeable {
}
}
- private PipesResult deserializeIntermediateResult(EmitKey emitKey) throws
IOException {
+ private PipesResult deserializeIntermediateResult(EmitKey emitKey,
ParseContext parseContext) throws IOException {
int length = input.readInt();
byte[] bytes = new byte[length];
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
index ffa134aba..70adc700b 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java
@@ -71,10 +71,8 @@ import org.apache.tika.pipes.emitter.StreamEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig;
import org.apache.tika.pipes.extractor.EmittingEmbeddedDocumentBytesHandler;
-import org.apache.tika.pipes.fetcher.FetchKey;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.pipes.fetcher.RangeFetcher;
import org.apache.tika.sax.BasicContentHandlerFactory;
import org.apache.tika.sax.ContentHandlerFactory;
import org.apache.tika.sax.RecursiveParserWrapperHandler;
@@ -282,7 +280,7 @@ public class PipesServer implements Runnable {
private void emit(String taskId, EmitKey emitKey,
boolean isExtractEmbeddedBytes,
MetadataListAndEmbeddedBytes parseData,
- String parseExceptionStack) {
+ String parseExceptionStack, ParseContext parseContext) {
Emitter emitter = null;
try {
@@ -298,7 +296,7 @@ public class PipesServer implements Runnable {
parseData.toBePackagedForStreamEmitter()) {
emitContentsAndBytes(emitter, emitKey, parseData);
} else {
- emitter.emit(emitKey.getEmitKey(),
parseData.getMetadataList());
+ emitter.emit(emitKey.getEmitKey(),
parseData.getMetadataList(), parseContext);
}
} catch (IOException | TikaEmitterException e) {
LOG.warn("emit exception", e);
@@ -417,11 +415,11 @@ public class PipesServer implements Runnable {
if (embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes() &&
parseData.toBePackagedForStreamEmitter()) {
emit(t.getId(), emitKey,
embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes(),
- parseData, stack);
+ parseData, stack, parseContext);
} else if (maxForEmitBatchBytes >= 0 &&
emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) {
emit(t.getId(), emitKey,
embeddedDocumentBytesConfig.isExtractEmbeddedDocumentBytes(),
- parseData, stack);
+ parseData, stack, parseContext);
} else {
//send back to the client
write(emitData);
@@ -460,35 +458,18 @@ public class PipesServer implements Runnable {
}
protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t,
Fetcher fetcher) {
- FetchKey fetchKey = t.getFetchKey();
- if (fetchKey.hasRange()) {
- if (!(fetcher instanceof RangeFetcher)) {
- throw new IllegalArgumentException(
- "fetch key has a range, but the fetcher is not a range
fetcher");
- }
- Metadata metadata = new Metadata();
- try (InputStream stream = ((RangeFetcher)
fetcher).fetch(fetchKey.getFetchKey(),
- fetchKey.getRangeStart(), fetchKey.getRangeEnd(),
metadata, t.getParseContext())) {
- return parseWithStream(t, stream, metadata);
- } catch (SecurityException e) {
- LOG.error("security exception " + t.getId(), e);
- throw e;
- } catch (TikaException | IOException e) {
- LOG.warn("fetch exception " + t.getId(), e);
- write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
- }
- } else {
- Metadata metadata = new Metadata();
- try (InputStream stream =
fetcher.fetch(t.getFetchKey().getFetchKey(), metadata, t.getParseContext())) {
- return parseWithStream(t, stream, metadata);
- } catch (SecurityException e) {
- LOG.error("security exception " + t.getId(), e);
- throw e;
- } catch (TikaException | IOException e) {
- LOG.warn("fetch exception " + t.getId(), e);
- write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
- }
+
+ Metadata metadata = new Metadata();
+ try (InputStream stream = fetcher.fetch(t)) {
+ return parseWithStream(t, stream, metadata);
+ } catch (SecurityException e) {
+ LOG.error("security exception " + t.getId(), e);
+ throw e;
+ } catch (TikaException | IOException e) {
+ LOG.warn("fetch exception " + t.getId(), e);
+ write(STATUS.FETCH_EXCEPTION, ExceptionUtils.getStackTrace(e));
}
+
return null;
}
@@ -578,8 +559,7 @@ public class PipesServer implements Runnable {
//TODO: especially clean this up.
if (!StringUtils.isBlank(embeddedDocumentBytesConfig.getEmitter())) {
parseContext.set(EmbeddedDocumentBytesHandler.class,
- new
EmittingEmbeddedDocumentBytesHandler(fetchEmitTuple.getEmitKey(),
- embeddedDocumentBytesConfig, emitterManager));
+ new EmittingEmbeddedDocumentBytesHandler(fetchEmitTuple,
emitterManager));
} else {
parseContext.set(EmbeddedDocumentBytesHandler.class,
new BasicEmbeddedDocumentBytesHandler(
@@ -707,8 +687,7 @@ public class PipesServer implements Runnable {
EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig =
parseContext.get(EmbeddedDocumentBytesConfig.class);
if (embeddedDocumentBytesConfig != null &&
embeddedDocumentBytesConfig.isIncludeOriginal()) {
- EmbeddedDocumentBytesHandler embeddedDocumentByteStore =
- parseContext.get(EmbeddedDocumentBytesHandler.class);
+ EmbeddedDocumentBytesHandler embeddedDocumentByteStore =
parseContext.get(EmbeddedDocumentBytesHandler.class);
try (InputStream is = Files.newInputStream(tis.getPath())) {
embeddedDocumentByteStore.add(0, metadata, is);
} catch (IOException e) {
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
index fce65c540..e0fe605f3 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncEmitter.java
@@ -42,7 +42,7 @@ import org.apache.tika.utils.ExceptionUtils;
*/
public class AsyncEmitter implements Callable<Integer> {
- static final EmitData EMIT_DATA_STOP_SEMAPHORE = new EmitData(null, null);
+ static final EmitData EMIT_DATA_STOP_SEMAPHORE = new EmitData(null, null,
null);
static final int EMITTER_FUTURE_CODE = 2;
private static final Logger LOG =
LoggerFactory.getLogger(AsyncEmitter.class);
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
index 648e0949d..4629f9263 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/AbstractEmitter.java
@@ -19,6 +19,8 @@ package org.apache.tika.pipes.emitter;
import java.io.IOException;
import java.util.List;
+import org.apache.tika.parser.ParseContext;
+
public abstract class AbstractEmitter implements Emitter {
private String name;
@@ -33,7 +35,7 @@ public abstract class AbstractEmitter implements Emitter {
}
/**
- * The default behavior is to call {@link #emit(String, List)} on each
item.
+ * The default behavior is to call {@link #emit(String, List,
ParseContext)} on each item.
* Some implementations, e.g. Solr/ES/vespa, can benefit from subclassing
this and
* emitting a bunch of docs at once.
*
@@ -44,7 +46,7 @@ public abstract class AbstractEmitter implements Emitter {
@Override
public void emit(List<? extends EmitData> emitData) throws IOException,
TikaEmitterException {
for (EmitData d : emitData) {
- emit(d.getEmitKey().getEmitKey(), d.getMetadataList());
+ emit(d.getEmitKey().getEmitKey(), d.getMetadataList(),
d.getParseContext());
}
}
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
index 95376a9fa..09d448adf 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmitData.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
import java.util.List;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
import org.apache.tika.utils.StringUtils;
public class EmitData implements Serializable {
@@ -30,18 +31,23 @@ public class EmitData implements Serializable {
private final EmitKey emitKey;
private final List<Metadata> metadataList;
-
private final String containerStackTrace;
+ private ParseContext parseContext = null;
public EmitData(EmitKey emitKey, List<Metadata> metadataList) {
this(emitKey, metadataList, StringUtils.EMPTY);
}
public EmitData(EmitKey emitKey, List<Metadata> metadataList, String
containerStackTrace) {
+ this(emitKey, metadataList, containerStackTrace, new ParseContext());
+ }
+
+ public EmitData(EmitKey emitKey, List<Metadata> metadataList, String
containerStackTrace, ParseContext parseContext) {
this.emitKey = emitKey;
this.metadataList = metadataList;
this.containerStackTrace = (containerStackTrace == null) ?
StringUtils.EMPTY :
containerStackTrace;
+ this.parseContext = parseContext;
}
public EmitKey getEmitKey() {
@@ -60,6 +66,14 @@ public class EmitData implements Serializable {
return estimateSizeInBytes(getEmitKey().getEmitKey(),
getMetadataList(), containerStackTrace);
}
+ public void setParseContext(ParseContext parseContext) {
+ this.parseContext = parseContext;
+ }
+
+ public ParseContext getParseContext() {
+ return parseContext;
+ }
+
private static long estimateSizeInBytes(String id, List<Metadata>
metadataList,
String containerStackTrace) {
long sz = 36 + id.length() * 2;
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
index f60ef3b77..c748541af 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/Emitter.java
@@ -20,12 +20,13 @@ import java.io.IOException;
import java.util.List;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
public interface Emitter {
String getName();
- void emit(String emitKey, List<Metadata> metadataList) throws IOException,
TikaEmitterException;
+ void emit(String emitKey, List<Metadata> metadataList, ParseContext
parseContext) throws IOException, TikaEmitterException;
void emit(List<? extends EmitData> emitData) throws IOException,
TikaEmitterException;
//TODO -- add this later for xhtml?
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
index b77107ba0..ef7adbef5 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/EmptyEmitter.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
public class EmptyEmitter implements Emitter {
@@ -29,7 +30,7 @@ public class EmptyEmitter implements Emitter {
}
@Override
- public void emit(String emitKey, List<Metadata> metadataList)
+ public void emit(String emitKey, List<Metadata> metadataList, ParseContext
parseContext)
throws IOException, TikaEmitterException {
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
b/tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
index 10526eb0e..30249b877 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/emitter/StreamEmitter.java
@@ -20,8 +20,9 @@ import java.io.IOException;
import java.io.InputStream;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
public interface StreamEmitter extends Emitter {
- void emit(String emitKey, InputStream inputStream, Metadata userMetadata)
+ void emit(String emitKey, InputStream inputStream, Metadata userMetadata,
ParseContext parseContext)
throws IOException, TikaEmitterException;
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
index 1132a4bc6..9c73578f0 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmittingEmbeddedDocumentBytesHandler.java
@@ -25,6 +25,8 @@ import org.apache.commons.io.IOExceptionWithCause;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.extractor.AbstractEmbeddedDocumentBytesHandler;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.emitter.EmitKey;
import org.apache.tika.pipes.emitter.Emitter;
import org.apache.tika.pipes.emitter.EmitterManager;
@@ -37,11 +39,16 @@ public class EmittingEmbeddedDocumentBytesHandler extends
AbstractEmbeddedDocume
private final StreamEmitter emitter;
private static final Metadata METADATA = new Metadata();
- public EmittingEmbeddedDocumentBytesHandler(EmitKey containerEmitKey,
- EmbeddedDocumentBytesConfig
embeddedDocumentBytesConfig,
+ private static final ParseContext PARSE_CONTEXT = new ParseContext();
+
+ public EmittingEmbeddedDocumentBytesHandler(FetchEmitTuple fetchEmitTuple,
EmitterManager emitterManager)
throws TikaConfigException {
- this.containerEmitKey = containerEmitKey;
- this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig;
+
+ this.containerEmitKey = fetchEmitTuple.getEmitKey();
+ this.embeddedDocumentBytesConfig =
fetchEmitTuple.getParseContext().get(EmbeddedDocumentBytesConfig.class);
+ if (this.embeddedDocumentBytesConfig == null) {
+ throw new TikaConfigException("EmbeddedDocumentBytesConfig must
not be null!");
+ }
Emitter tmpEmitter =
emitterManager.getEmitter(embeddedDocumentBytesConfig.getEmitter());
if (! (tmpEmitter instanceof StreamEmitter)) {
@@ -58,7 +65,7 @@ public class EmittingEmbeddedDocumentBytesHandler extends
AbstractEmbeddedDocume
String emitKey = getEmitKey(containerEmitKey.getEmitKey(),
id, embeddedDocumentBytesConfig, metadata);
try {
- emitter.emit(emitKey, inputStream, METADATA);
+ emitter.emit(emitKey, inputStream, METADATA, PARSE_CONTEXT);
} catch (TikaEmitterException e) {
throw new IOExceptionWithCause(e);
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
index d64f81524..13ebb894a 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/EmptyFetcher.java
@@ -20,8 +20,7 @@ import java.io.IOException;
import java.io.InputStream;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.FetchEmitTuple;
public class EmptyFetcher implements Fetcher {
@@ -31,7 +30,7 @@ public class EmptyFetcher implements Fetcher {
}
@Override
- public InputStream fetch(String fetchKey, Metadata metadata, ParseContext
parseContext) throws TikaException, IOException {
+ public InputStream fetch(FetchEmitTuple t) throws TikaException,
IOException {
return null;
}
}
diff --git a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
index 8f7a186fd..0e1a41735 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/Fetcher.java
@@ -20,8 +20,7 @@ import java.io.IOException;
import java.io.InputStream;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.FetchEmitTuple;
/**
* Interface for an object that will fetch an InputStream given
@@ -34,5 +33,5 @@ public interface Fetcher {
String getName();
- InputStream fetch(String fetchKey, Metadata metadata, ParseContext
parseContext) throws TikaException, IOException;
+ InputStream fetch(FetchEmitTuple t) throws TikaException, IOException;
}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
deleted file mode 100644
index 7754cb87c..000000000
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/RangeFetcher.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tika.pipes.fetcher;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
-
-/**
- * This class extracts a range of bytes from a given fetch key.
- */
-public interface RangeFetcher extends Fetcher {
- //At some point, Tika 3.x?, we may want to add optional ranges to the
fetchKey?
-
- InputStream fetch(String fetchKey, long startOffset, long endOffset,
Metadata metadata, ParseContext parseContext)
- throws TikaException, IOException;
-
-}
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
index ce464dd14..eedb059d3 100644
---
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
+++
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/fs/FileSystemFetcher.java
@@ -41,7 +41,7 @@ import org.apache.tika.metadata.FileSystem;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.Property;
import org.apache.tika.metadata.TikaCoreProperties;
-import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
public class FileSystemFetcher extends AbstractFetcher implements
Initializable {
@@ -59,8 +59,9 @@ public class FileSystemFetcher extends AbstractFetcher
implements Initializable
}
@Override
- public InputStream fetch(String fetchKey, Metadata metadata, ParseContext
parseContext) throws IOException, TikaException {
-
+ public InputStream fetch(FetchEmitTuple t) throws IOException,
TikaException {
+ String fetchKey = t.getFetchKey().getFetchKey();
+ Metadata metadata = t.getMetadata();
if (fetchKey.contains("\u0000")) {
throw new IllegalArgumentException("Path must not contain 'u0000'.
" +
"Please review the life decisions that led you to
requesting " +
diff --git
a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
index 7692516cd..ef967d1ee 100644
--- a/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
+++ b/tika-core/src/main/java/org/apache/tika/pipes/fetcher/url/UrlFetcher.java
@@ -24,7 +24,7 @@ import java.util.Locale;
import org.apache.tika.exception.TikaException;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
/**
@@ -36,7 +36,9 @@ import org.apache.tika.pipes.fetcher.AbstractFetcher;
public class UrlFetcher extends AbstractFetcher {
@Override
- public InputStream fetch(String fetchKey, Metadata metadata, ParseContext
parseContext) throws IOException, TikaException {
+ public InputStream fetch(FetchEmitTuple t) throws IOException,
TikaException {
+ String fetchKey = t.getFetchKey().getFetchKey();
+ Metadata metadata = t.getMetadata();
if (fetchKey.contains("\u0000")) {
throw new IllegalArgumentException("URL must not contain \u0000. "
+
"Please review the life decisions that led you to
requesting " +
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
b/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
index 2374c1474..b940ec7be 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockEmitter.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.EmitData;
import org.apache.tika.pipes.emitter.EmitKey;
@@ -40,11 +41,11 @@ public class MockEmitter extends AbstractEmitter {
}
@Override
- public void emit(String emitKey, List<Metadata> metadataList)
+ public void emit(String emitKey, List<Metadata> metadataList, ParseContext
parseContext)
throws IOException, TikaEmitterException {
emit(
Collections.singletonList(new EmitData(new EmitKey(getName(),
emitKey),
- metadataList)));
+ metadataList, null, parseContext)));
}
@Override
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
b/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
index acb533ece..c43b8d82a 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockFetcher.java
@@ -22,8 +22,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.fetcher.Fetcher;
public class MockFetcher implements Fetcher {
@@ -38,7 +37,7 @@ public class MockFetcher implements Fetcher {
}
@Override
- public InputStream fetch(String fetchKey, Metadata metadata, ParseContext
parseContext) throws TikaException, IOException {
+ public InputStream fetch(FetchEmitTuple fetchEmitTuple) throws
TikaException, IOException {
return new ByteArrayInputStream(BYTES);
}
}
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
b/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
index 036a95965..89c3c2ce4 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/emitter/MockEmitter.java
@@ -26,6 +26,7 @@ import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.parser.ParseContext;
public class MockEmitter extends AbstractEmitter implements Initializable {
@@ -52,7 +53,7 @@ public class MockEmitter extends AbstractEmitter implements
Initializable {
}
@Override
- public void emit(String emitKey, List<Metadata> metadataList)
+ public void emit(String emitKey, List<Metadata> metadataList, ParseContext
parseContext)
throws IOException, TikaEmitterException {
}
diff --git
a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
index e9104e0a8..9a96a9c25 100644
--- a/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
+++ b/tika-core/src/test/java/org/apache/tika/pipes/fetcher/MockFetcher.java
@@ -28,8 +28,7 @@ import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
-import org.apache.tika.metadata.Metadata;
-import org.apache.tika.parser.ParseContext;
+import org.apache.tika.pipes.FetchEmitTuple;
public class MockFetcher extends AbstractFetcher implements Initializable {
@@ -65,7 +64,7 @@ public class MockFetcher extends AbstractFetcher implements
Initializable {
@Override
- public InputStream fetch(String fetchKey, Metadata metadata, ParseContext
parseContext) throws TikaException, IOException {
+ public InputStream fetch(FetchEmitTuple fetchEmitTuple) throws
TikaException, IOException {
return byteString == null ? new ByteArrayInputStream(new byte[0]) :
new
ByteArrayInputStream(byteString.getBytes(StandardCharsets.UTF_8));
}
diff --git
a/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java
b/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java
index 495bcec17..7b9544f1e 100644
---
a/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java
+++
b/tika-integration-tests/tika-pipes-s3-integration-tests/src/test/java/org/apache/tika/pipes/s3/tests/PipeIntegrationTests.java
@@ -198,7 +198,7 @@ public class PipeIntegrationTests {
if (Files.isRegularFile(targ)) {
return;
}
- try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(),
t.getMetadata())) {
+ try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(),
t.getMetadata(), )) {
System.out.println(counter.getAndIncrement() + " : " + t);
Files.createDirectories(targ.getParent());
Files.copy(is, targ);
@@ -239,8 +239,8 @@ public class PipeIntegrationTests {
Metadata userMetadata = new Metadata();
userMetadata.set("project", "my-project");
- try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(),
t.getMetadata())) {
- emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata);
+ try (InputStream is = fetcher.fetch(t.getFetchKey().getFetchKey(),
t.getMetadata(), t.getParseContext())) {
+ emitter.emit(t.getEmitKey().getEmitKey(), is, userMetadata,
t.getParseContext());
}
}
}
diff --git
a/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java
index 180bc204a..e4b1e8ae4 100644
---
a/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-az-blob/src/main/java/org/apache/tika/pipes/emitter/azblob/AZBlobEmitter.java
@@ -48,6 +48,7 @@ import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.StreamEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
@@ -80,7 +81,7 @@ public class AZBlobEmitter extends AbstractEmitter implements
Initializable, Str
* @throws TikaException
*/
@Override
- public void emit(String emitKey, List<Metadata> metadataList)
+ public void emit(String emitKey, List<Metadata> metadataList, ParseContext
parseContext)
throws IOException, TikaEmitterException {
if (metadataList == null || metadataList.size() == 0) {
throw new TikaEmitterException("metadata list must not be null or
of size 0");
@@ -109,7 +110,7 @@ public class AZBlobEmitter extends AbstractEmitter
implements Initializable, Str
* @throws TikaEmitterException or IOexception if there is a Runtime
client exception
*/
@Override
- public void emit(String path, InputStream is, Metadata userMetadata)
+ public void emit(String path, InputStream is, Metadata userMetadata,
ParseContext parseContext)
throws IOException, TikaEmitterException {
String lengthString = userMetadata.get(Metadata.CONTENT_LENGTH);
long length = -1;
diff --git
a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
index a90c5e509..c219bcb49 100644
---
a/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-fs/src/main/java/org/apache/tika/pipes/emitter/fs/FileSystemEmitter.java
@@ -31,6 +31,7 @@ import org.apache.tika.config.Field;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.StreamEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
@@ -73,7 +74,7 @@ public class FileSystemEmitter extends AbstractEmitter
implements StreamEmitter
private boolean prettyPrint = false;
@Override
- public void emit(String emitKey, List<Metadata> metadataList)
+ public void emit(String emitKey, List<Metadata> metadataList, ParseContext
parseContext)
throws IOException, TikaEmitterException {
Path output;
if (metadataList == null || metadataList.size() == 0) {
@@ -144,7 +145,7 @@ public class FileSystemEmitter extends AbstractEmitter
implements StreamEmitter
}
@Override
- public void emit(String path, InputStream inputStream, Metadata
userMetadata)
+ public void emit(String path, InputStream inputStream, Metadata
userMetadata, ParseContext parseContext)
throws IOException, TikaEmitterException {
Path target = basePath.resolve(path);
diff --git
a/tika-pipes/tika-emitters/tika-emitter-gcs/src/main/java/org/apache/tika/pipes/emitter/gcs/GCSEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-gcs/src/main/java/org/apache/tika/pipes/emitter/gcs/GCSEmitter.java
index 9121190d6..1e2f9825d 100644
---
a/tika-pipes/tika-emitters/tika-emitter-gcs/src/main/java/org/apache/tika/pipes/emitter/gcs/GCSEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-gcs/src/main/java/org/apache/tika/pipes/emitter/gcs/GCSEmitter.java
@@ -46,6 +46,7 @@ import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.StreamEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
@@ -69,7 +70,7 @@ public class GCSEmitter extends AbstractEmitter implements
Initializable, Stream
* @throws TikaException
*/
@Override
- public void emit(String emitKey, List<Metadata> metadataList)
+ public void emit(String emitKey, List<Metadata> metadataList, ParseContext
parseContext)
throws IOException, TikaEmitterException {
if (metadataList == null || metadataList.size() == 0) {
throw new TikaEmitterException("metadata list must not be null or
of size 0");
diff --git
a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
index fe2f27aed..5d0d88445 100644
---
a/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-s3/src/main/java/org/apache/tika/pipes/emitter/s3/S3Emitter.java
@@ -57,6 +57,7 @@ import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.TikaCoreProperties;
import org.apache.tika.metadata.serialization.JsonMetadataList;
+import org.apache.tika.parser.ParseContext;
import org.apache.tika.pipes.emitter.AbstractEmitter;
import org.apache.tika.pipes.emitter.StreamEmitter;
import org.apache.tika.pipes.emitter.TikaEmitterException;
@@ -120,7 +121,7 @@ public class S3Emitter extends AbstractEmitter implements
Initializable, StreamE
* @throws TikaException
*/
@Override
- public void emit(String emitKey, List<Metadata> metadataList)
+ public void emit(String emitKey, List<Metadata> metadataList, ParseContext
parseContext)
throws IOException, TikaEmitterException {
if (metadataList == null || metadataList.size() == 0) {
throw new TikaEmitterException("metadata list must not be null or
of size 0");
@@ -148,7 +149,7 @@ public class S3Emitter extends AbstractEmitter implements
Initializable, StreamE
throw new TikaEmitterException("can't jsonify", e);
}
try (InputStream is = TikaInputStream.get(tmpPath)) {
- emit(emitKey, is, new Metadata());
+ emit(emitKey, is, new Metadata(), parseContext);
}
}
}
@@ -161,7 +162,7 @@ public class S3Emitter extends AbstractEmitter implements
Initializable, StreamE
* @throws TikaEmitterException or IOexception if there is a Runtime s3
client exception
*/
@Override
- public void emit(String path, InputStream is, Metadata userMetadata)
+ public void emit(String path, InputStream is, Metadata userMetadata,
ParseContext parseContext)
throws IOException, TikaEmitterException {
if (!StringUtils.isBlank(prefix)) {
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
index 26b45f8bf..6c9a76053 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
+++
b/tika-pipes/tika-fetchers/tika-fetcher-http/src/main/java/org/apache/tika/pipes/fetcher/http/HttpFetcher.java
@@ -60,15 +60,14 @@ import org.apache.tika.config.Initializable;
import org.apache.tika.config.InitializableProblemHandler;
import org.apache.tika.config.Param;
import org.apache.tika.exception.TikaConfigException;
-import org.apache.tika.exception.TikaException;
import org.apache.tika.exception.TikaTimeoutException;
import org.apache.tika.io.TemporaryResources;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.metadata.Property;
import org.apache.tika.metadata.TikaCoreProperties;
+import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
-import org.apache.tika.pipes.fetcher.RangeFetcher;
import org.apache.tika.utils.StringUtils;
/**
@@ -134,29 +133,24 @@ public class HttpFetcher extends AbstractFetcher
implements Initializable, Range
//By default httpclient adds e.g. "Apache-HttpClient/4.5.13 (Java/x.y.z)"
private String userAgent = null;
-
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
IOException, TikaException {
+ public InputStream fetch(FetchEmitTuple fetchEmitTuple)
+ throws IOException {
+ String fetchKey = fetchEmitTuple.getFetchKey().getFetchKey();
+ Metadata metadata = fetchEmitTuple.getMetadata();
HttpGet get = new HttpGet(fetchKey);
RequestConfig requestConfig =
RequestConfig.custom()
- .setMaxRedirects(maxRedirects)
- .setRedirectsEnabled(true).build();
+ .setMaxRedirects(maxRedirects)
+ .setRedirectsEnabled(true).build();
get.setConfig(requestConfig);
if (! StringUtils.isBlank(userAgent)) {
get.setHeader(USER_AGENT, userAgent);
}
- return execute(get, metadata, httpClient, true);
- }
-
- @Override
- public InputStream fetch(String fetchKey, long startRange, long endRange,
Metadata metadata)
- throws IOException {
- HttpGet get = new HttpGet(fetchKey);
- if (! StringUtils.isBlank(userAgent)) {
- get.setHeader(USER_AGENT, userAgent);
+ if (fetchEmitTuple.getFetchKey().hasRange()) {
+ get.setHeader("Range", "bytes=" +
fetchEmitTuple.getFetchKey().getRangeStart() + "-"
+ + fetchEmitTuple.getFetchKey().getRangeEnd());
}
- get.setHeader("Range", "bytes=" + startRange + "-" + endRange);
return execute(get, metadata, httpClient, true);
}
diff --git
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
index b57c361b9..908006dfa 100644
---
a/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
+++
b/tika-pipes/tika-fetchers/tika-fetcher-s3/src/main/java/org/apache/tika/pipes/fetcher/s3/S3Fetcher.java
@@ -55,8 +55,8 @@ import org.apache.tika.io.FilenameUtils;
import org.apache.tika.io.TemporaryResources;
import org.apache.tika.io.TikaInputStream;
import org.apache.tika.metadata.Metadata;
+import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.fetcher.AbstractFetcher;
-import org.apache.tika.pipes.fetcher.RangeFetcher;
import org.apache.tika.utils.StringUtils;
/**
@@ -106,13 +106,12 @@ public class S3Fetcher extends AbstractFetcher implements
Initializable, RangeFe
private boolean pathStyleAccessEnabled = false;
@Override
- public InputStream fetch(String fetchKey, Metadata metadata) throws
TikaException, IOException {
- return fetch(fetchKey, -1, -1, metadata);
- }
-
- @Override
- public InputStream fetch(String fetchKey, long startRange, long endRange,
Metadata metadata)
+ public InputStream fetch(FetchEmitTuple fetchEmitTuple)
throws TikaException, IOException {
+ String fetchKey = fetchEmitTuple.getFetchKey().getFetchKey();
+ long startRange = fetchEmitTuple.getFetchKey().getRangeStart();
+ long endRange = fetchEmitTuple.getFetchKey().getRangeEnd();
+ Metadata metadata = fetchEmitTuple.getMetadata();
String theFetchKey = StringUtils.isBlank(prefix) ? fetchKey : prefix +
fetchKey;
if (LOGGER.isDebugEnabled()) {
diff --git
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/FetcherStreamFactory.java
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/FetcherStreamFactory.java
index f173808d6..c30c0f951 100644
---
a/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/FetcherStreamFactory.java
+++
b/tika-server/tika-server-core/src/main/java/org/apache/tika/server/core/FetcherStreamFactory.java
@@ -33,7 +33,6 @@ import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.pipes.fetcher.Fetcher;
import org.apache.tika.pipes.fetcher.FetcherManager;
-import org.apache.tika.pipes.fetcher.RangeFetcher;
/**
* This class looks for "fetcherName" in the http header. If it is
not null