Copilot commented on code in PR #3670:
URL: https://github.com/apache/solr/pull/3670#discussion_r2426770023
##########
solr/modules/extraction/src/java/org/apache/solr/handler/extraction/ExtractingRequestHandler.java:
##########
@@ -81,25 +64,117 @@ public void inform(SolrCore core) {
parseContextConfig =
new ParseContextConfig(core.getResourceLoader(),
parseContextConfigLoc);
}
+
+ // Always create local backend
+ this.localBackend = new LocalTikaExtractionBackend(core, tikaConfigLoc,
parseContextConfig);
+
+ // Optionally create Tika Server backend if URL configured
+ String tikaServerUrl = (String)
initArgs.get(ExtractingParams.TIKASERVER_URL);
+ if (tikaServerUrl != null) {
+ int timeoutSecs = 0;
+ Object initTimeout =
initArgs.get(ExtractingParams.TIKASERVER_TIMEOUT_SECS);
+ if (initTimeout != null) {
+ try {
+ timeoutSecs = Integer.parseInt(String.valueOf(initTimeout));
+ } catch (NumberFormatException nfe) {
+ throw new SolrException(
+ ErrorCode.SERVER_ERROR,
+ "Invalid value for '"
+ + ExtractingParams.TIKASERVER_TIMEOUT_SECS
+ + "': "
+ + initTimeout,
+ nfe);
+ }
+ }
+ Object maxCharsObj =
initArgs.get(ExtractingParams.TIKASERVER_MAX_CHARS);
+ long maxCharsLimit = 1024 * 1024 * 1024;
+ ;
Review Comment:
Remove the extra semicolon on line 91. This is a formatting issue that makes
the code look inconsistent.
```suggestion
```
##########
solr/modules/extraction/src/java/org/apache/solr/handler/extraction/TikaServerExtractionBackend.java:
##########
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.extraction;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.tika.sax.BodyContentHandler;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.InputStreamRequestContent;
+import org.eclipse.jetty.client.InputStreamResponseListener;
+import org.eclipse.jetty.client.Request;
+import org.eclipse.jetty.client.Response;
+import org.eclipse.jetty.io.EofException;
+import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
+import org.xml.sax.helpers.DefaultHandler;
+
+/** Extraction backend using the Tika Server. It uses a shared Jetty
HttpClient. */
+public class TikaServerExtractionBackend implements ExtractionBackend {
+ private static volatile HttpClient SHARED_CLIENT;
+ private static volatile ExecutorService SHARED_EXECUTOR;
+ private static final Object INIT_LOCK = new Object();
+ private static volatile boolean INITIALIZED = false;
+ private static volatile boolean SHUTDOWN = false;
+ private final String baseUrl;
+ private static final int DEFAULT_TIMEOUT_SECONDS = 3 * 60;
+ private final Duration defaultTimeout;
+ private final TikaServerParser tikaServerResponseParser = new
TikaServerParser();
+ private boolean tikaMetadataCompatibility;
+ private HashMap<String, Object> initArgsMap = new HashMap<>();
+ private final long maxCharsLimit;
+
+ public TikaServerExtractionBackend(String baseUrl) {
+ this(baseUrl, DEFAULT_TIMEOUT_SECONDS, null, 100 * 1024 * 1024);
+ }
+
+ public TikaServerExtractionBackend(
+ String baseUrl, int timeoutSeconds, NamedList<?> initArgs, long
maxCharsLimit) {
+ this.maxCharsLimit = maxCharsLimit;
+ if (initArgs != null) {
+ initArgs.toMap(this.initArgsMap);
+ }
+ Object metaCompatObh =
this.initArgsMap.get(ExtractingParams.TIKASERVER_METADATA_COMPATIBILITY);
+ if (metaCompatObh != null) {
+ this.tikaMetadataCompatibility =
Boolean.parseBoolean(metaCompatObh.toString());
+ }
+ if (timeoutSeconds <= 0) {
+ timeoutSeconds = DEFAULT_TIMEOUT_SECONDS;
+ }
+ if (baseUrl.endsWith("/")) {
+ this.baseUrl = baseUrl.substring(0, baseUrl.length() - 1);
+ } else {
+ this.baseUrl = baseUrl;
+ }
+ this.defaultTimeout =
+ Duration.ofSeconds(timeoutSeconds > 0 ? timeoutSeconds :
DEFAULT_TIMEOUT_SECONDS);
+ }
+
+ public static final String NAME = "tikaserver";
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public ExtractionResult extract(InputStream inputStream, ExtractionRequest
request)
+ throws Exception {
+ try (InputStream tikaResponse = callTikaServer(inputStream, request)) {
+ ExtractionMetadata md = buildMetadataFromRequest(request);
+ BodyContentHandler bodyContentHandler = new BodyContentHandler(-1);
+ if (request.tikaServerRecursive) {
+ tikaServerResponseParser.parseRmetaJson(tikaResponse,
bodyContentHandler, md);
+ } else {
+ tikaServerResponseParser.parseXml(tikaResponse, bodyContentHandler,
md);
+ }
+ if (tikaMetadataCompatibility) {
+ appendBackCompatTikaMetadata(md);
+ }
+ return new ExtractionResult(bodyContentHandler.toString(), md);
+ }
+ }
+
+ @Override
+ public void extractWithSaxHandler(
+ InputStream inputStream,
+ ExtractionRequest request,
+ ExtractionMetadata md,
+ DefaultHandler saxContentHandler)
+ throws Exception {
+ try (InputStream tikaResponse = callTikaServer(inputStream, request)) {
+ if (request.tikaServerRecursive) {
+ tikaServerResponseParser.parseRmetaJson(tikaResponse,
saxContentHandler, md);
+ } else {
+ tikaServerResponseParser.parseXml(tikaResponse, saxContentHandler, md);
+ }
+ if (tikaMetadataCompatibility) {
+ appendBackCompatTikaMetadata(md);
+ }
+ }
+ }
+
+ /**
+ * Call the Tika Server to extract text and metadata. Depending on
<code>request.recursive</code>,
+ * will either return XML (false) or JSON array (true). <b>The recursive
mode consumes more memory
+ * both on the TikaServer side and on the Solr side</b>
+ *
+ * @return InputStream of the response body, either XML or JSON depending on
<code>
+ * request.tikaserverRecursive</code>
+ */
+ InputStream callTikaServer(InputStream inputStream, ExtractionRequest
request) throws Exception {
+ String url = baseUrl + (request.tikaServerRecursive ? "/rmeta" : "/tika");
+
+ ensureClientInitialized();
+ HttpClient client = SHARED_CLIENT;
+
+ Request req = client.newRequest(url).method("PUT");
+ Duration effectiveTimeout =
+ (request.tikaServerTimeoutSeconds != null &&
request.tikaServerTimeoutSeconds > 0)
+ ? Duration.ofSeconds(request.tikaServerTimeoutSeconds)
+ : defaultTimeout;
+ req.timeout(effectiveTimeout.toMillis(), TimeUnit.MILLISECONDS);
+
+ // Headers
+ String accept = (request.tikaServerRecursive ? "application/json" :
"text/xml");
+ req.headers(h -> h.add("Accept", accept));
+ String contentType = (request.streamType != null) ? request.streamType :
request.contentType;
+ if (contentType != null) {
+ req.headers(h -> h.add("Content-Type", contentType));
+ }
+ if (!request.tikaServerRequestHeaders.isEmpty()) {
+ req.headers(
+ h ->
+ request.tikaServerRequestHeaders.forEach(
+ (k, v) -> {
+ if (k != null && v != null) h.add(k, v);
+ }));
+ }
+
+ ExtractionMetadata md = buildMetadataFromRequest(request);
+ if (request.resourcePassword != null || request.passwordsMap != null) {
+ RegexRulesPasswordProvider passwordProvider = new
RegexRulesPasswordProvider();
+ if (request.resourcePassword != null) {
+ passwordProvider.setExplicitPassword(request.resourcePassword);
+ }
+ if (request.passwordsMap != null) {
+ passwordProvider.setPasswordMap(request.passwordsMap);
+ }
+ String pwd = passwordProvider.getPassword(md);
+ if (pwd != null) {
+ req.headers(h -> h.add("Password", pwd)); // Tika Server expects this
header if provided
+ }
+ }
+ if (request.resourceName != null) {
+ req.headers(
+ h ->
+ h.add(
+ "Content-Disposition", "attachment; filename=\"" +
request.resourceName + "\""));
+ }
+
+ if (contentType != null) {
+ req.body(new InputStreamRequestContent(contentType, inputStream));
+ } else {
+ req.body(new InputStreamRequestContent(inputStream));
+ }
+
+ InputStreamResponseListener listener = new InputStreamResponseListener();
+ req.send(listener);
+
+ final Response response;
+ try {
+ response = listener.get(effectiveTimeout.toMillis(),
TimeUnit.MILLISECONDS);
+ } catch (TimeoutException te) {
+ throw new SolrException(
+ SolrException.ErrorCode.GATEWAY_TIMEOUT,
+ "Timeout after "
+ + effectiveTimeout.toMillis()
+ + " ms while waiting for response from TikaServer "
+ + url,
+ te);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Interrupted while waiting for response from TikaServer " + url,
+ ie);
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ if (cause instanceof ConnectException
+ || cause instanceof SocketTimeoutException
+ || cause instanceof EofException
+ || cause instanceof ClosedChannelException) {
+ throw new SolrException(
+ SolrException.ErrorCode.SERVICE_UNAVAILABLE,
+ "Error communicating with TikaServer "
+ + url
+ + ": "
+ + cause.getClass().getSimpleName()
+ + ": "
+ + cause.getMessage(),
+ cause);
+ }
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Unexpected error while calling TikaServer " + url,
+ ee);
+ }
+
+ int code = response.getStatus();
+ if (code < 200 || code >= 300) {
+ SolrException.ErrorCode errorCode =
SolrException.ErrorCode.getErrorCode(code);
+ String reason = response.getReason();
+ String msg =
+ "TikaServer "
+ + url
+ + " returned status "
+ + code
+ + (reason != null ? " (" + reason + ")" : "");
+ throw new SolrException(errorCode, msg);
+ }
+
+ InputStream responseStream = listener.getInputStream();
+ // Bound the amount of data we read from Tika Server to avoid excessive
memory/CPU usage
+ return new LimitingInputStream(responseStream, maxCharsLimit);
+ }
+
+ private static class LimitingInputStream extends InputStream {
+ private final InputStream in;
+ private final long max;
+ private long count;
+
+ LimitingInputStream(InputStream in, long max) {
+ this.in = in;
+ this.max = max;
+ this.count = 0L;
+ }
+
+ private void checkLimit(long toAdd) {
+ if (max <= 0) return; // non-positive means unlimited
+ long newCount = count + toAdd;
+ if (newCount > max) {
+ throw new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST,
+ "TikaServer response exceeded the configured maximum size of " +
max + " bytes");
+ }
+ count = newCount;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int b = in.read();
+ if (b != -1) {
+ checkLimit(1);
+ }
+ return b;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int n = in.read(b, off, len);
+ if (n > 0) {
+ checkLimit(n);
+ }
+ return n;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long skipped = in.skip(n);
+ if (skipped > 0) {
+ checkLimit(skipped);
+ }
+ return skipped;
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public int available() throws IOException {
+ return in.available();
+ }
+ }
+
+ private static void ensureClientInitialized() {
+ if (INITIALIZED) return;
+ synchronized (INIT_LOCK) {
+ if (INITIALIZED) return;
+ ThreadFactory tf = new SolrNamedThreadFactory("TikaServerHttpClient");
+ ExecutorService exec = ExecutorUtil.newMDCAwareCachedThreadPool(tf);
+ HttpClient client = new HttpClient();
+ client.setExecutor(exec);
+ client.setScheduler(new
ScheduledExecutorScheduler("TikaServerHttpClient-scheduler", true));
+ try {
+ client.start();
+ } catch (Exception e) {
+ try {
+ exec.shutdownNow();
+ } catch (Throwable ignore) {
+ }
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR, "Failed to start shared
Jetty HttpClient", e);
+ }
+ SHARED_EXECUTOR = exec;
+ SHARED_CLIENT = client;
+ INITIALIZED = true;
+ SHUTDOWN = false;
+ }
+ }
+
+ private final Map<String, String> fieldMappings = new LinkedHashMap<>();
+
+ // TODO: Improve backward compatibility by adding more mappings
Review Comment:
This TODO comment indicates incomplete functionality. Consider either
implementing the additional mappings or documenting the current limitations
more clearly in the code comments.
```suggestion
/*
* Backward compatibility mappings for Tika metadata fields.
* Only the following mappings are currently supported:
* dc:title -> title
* dc:creator -> author
* dc:description -> description
* dc:subject -> subject
* dc:language -> language
* dc:publisher -> publisher
* dcterms:created -> created
* dcterms:modified -> modified
* meta:author -> Author
* meta:creation-date -> Creation-Date
* meta:save-date -> Last-Save-Date
* meta:keyword -> Keywords
* pdf:docinfo:keywords -> Keywords
* If additional mappings are required for backward compatibility,
* they must be added here.
*/
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]