Author: bob
Date: Thu Oct 15 02:08:04 2015
New Revision: 1708723
URL: http://svn.apache.org/viewvc?rev=1708723&view=rev
Log:
TIKA-1762 - Create a Configurable ExecutorService in TikaConfig
Added:
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/ConfigurableThreadPoolExecutor.java
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/SimpleThreadPoolExecutor.java
tika/trunk/tika-core/src/main/java/org/apache/tika/utils/ConcurrentUtils.java
tika/trunk/tika-core/src/test/java/org/apache/tika/config/DummyExecutor.java
tika/trunk/tika-core/src/test/java/org/apache/tika/utils/ConcurrentUtilsTest.java
tika/trunk/tika-core/src/test/resources/org/apache/tika/config/TIKA-1762-executors.xml
Modified:
tika/trunk/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
tika/trunk/tika-core/src/test/java/org/apache/tika/config/TikaConfigTest.java
Added:
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/ConfigurableThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/ConfigurableThreadPoolExecutor.java?rev=1708723&view=auto
==============================================================================
---
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/ConfigurableThreadPoolExecutor.java
(added)
+++
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/ConfigurableThreadPoolExecutor.java
Thu Oct 15 02:08:04 2015
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Allows Thread Pool to be Configurable.
+ *
+ * @since Apache Tika 1.11
+ */
+public interface ConfigurableThreadPoolExecutor extends ExecutorService {
+
+ public void setMaximumPoolSize(int threads);
+
+ public void setCorePoolSize(int threads);
+
+}
Added:
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/SimpleThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/SimpleThreadPoolExecutor.java?rev=1708723&view=auto
==============================================================================
---
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/SimpleThreadPoolExecutor.java
(added)
+++
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/SimpleThreadPoolExecutor.java
Thu Oct 15 02:08:04 2015
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.concurrent;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Simple Thread Pool Executor
+ *
+ * @since Apache Tika 1.11
+ */
+public class SimpleThreadPoolExecutor extends ThreadPoolExecutor implements
ConfigurableThreadPoolExecutor {
+
+ public SimpleThreadPoolExecutor() {
+ super(1, 2, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "Tika Executor Thread");
+ }
+ });
+ }
+}
Modified:
tika/trunk/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java?rev=1708723&r1=1708722&r2=1708723&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
(original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
Thu Oct 15 02:08:04 2015
@@ -29,12 +29,15 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import javax.imageio.spi.ServiceRegistry;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
+import org.apache.tika.concurrent.ConfigurableThreadPoolExecutor;
+import org.apache.tika.concurrent.SimpleThreadPoolExecutor;
import org.apache.tika.detect.CompositeDetector;
import org.apache.tika.detect.DefaultDetector;
import org.apache.tika.detect.Detector;
@@ -79,12 +82,18 @@ public class TikaConfig {
private static Translator getDefaultTranslator(ServiceLoader loader) {
return new DefaultTranslator(loader);
}
+
+ private static ConfigurableThreadPoolExecutor getDefaultExecutorService() {
+ return new SimpleThreadPoolExecutor();
+ }
+
private final ServiceLoader serviceLoader;
private final CompositeParser parser;
private final CompositeDetector detector;
private final Translator translator;
private final MimeTypes mimeTypes;
+ private final ExecutorService executorService;
public TikaConfig(String file)
throws TikaException, IOException, SAXException {
@@ -139,11 +148,13 @@ public class TikaConfig {
ParserXmlLoader parserLoader = new ParserXmlLoader();
DetectorXmlLoader detectorLoader = new DetectorXmlLoader();
TranslatorXmlLoader translatorLoader = new TranslatorXmlLoader();
+ ExecutorServiceXmlLoader executorLoader = new
ExecutorServiceXmlLoader();
this.mimeTypes = typesFromDomElement(element);
this.detector = detectorLoader.loadOverall(element, mimeTypes, loader);
this.parser = parserLoader.loadOverall(element, mimeTypes, loader);
this.translator = translatorLoader.loadOverall(element, mimeTypes,
loader);
+ this.executorService = executorLoader.loadOverall(element, mimeTypes,
loader);
this.serviceLoader = loader;
}
@@ -166,6 +177,7 @@ public class TikaConfig {
this.detector = getDefaultDetector(mimeTypes, serviceLoader);
this.parser = getDefaultParser(mimeTypes, serviceLoader);
this.translator = getDefaultTranslator(serviceLoader);
+ this.executorService = getDefaultExecutorService();
}
/**
@@ -198,6 +210,7 @@ public class TikaConfig {
this.parser = getDefaultParser(mimeTypes, serviceLoader);
this.detector = getDefaultDetector(mimeTypes, serviceLoader);
this.translator = getDefaultTranslator(serviceLoader);
+ this.executorService = getDefaultExecutorService();
} else {
// Locate the given configuration file
InputStream stream = null;
@@ -224,11 +237,13 @@ public class TikaConfig {
ParserXmlLoader parserLoader = new ParserXmlLoader();
DetectorXmlLoader detectorLoader = new DetectorXmlLoader();
TranslatorXmlLoader translatorLoader = new
TranslatorXmlLoader();
+ ExecutorServiceXmlLoader executorLoader = new
ExecutorServiceXmlLoader();
this.mimeTypes = typesFromDomElement(element);
this.parser = parserLoader.loadOverall(element, mimeTypes,
serviceLoader);
this.detector = detectorLoader.loadOverall(element, mimeTypes,
serviceLoader);
this.translator = translatorLoader.loadOverall(element,
mimeTypes, serviceLoader);
+ this.executorService = executorLoader.loadOverall(element,
mimeTypes, serviceLoader);
} catch (SAXException e) {
throw new TikaException(
"Specified Tika configuration has syntax errors: "
@@ -287,6 +302,10 @@ public class TikaConfig {
public Translator getTranslator() {
return translator;
}
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
public MimeTypes getMimeRepository(){
return mimeTypes;
@@ -788,4 +807,77 @@ public class TikaConfig {
return created; // No decoration of Translators
}
}
+
+ private static class ExecutorServiceXmlLoader extends
XmlLoader<ConfigurableThreadPoolExecutor,ConfigurableThreadPoolExecutor> {
+ @Override
+ ConfigurableThreadPoolExecutor createComposite(
+ Class<? extends ConfigurableThreadPoolExecutor> compositeClass,
+ List<ConfigurableThreadPoolExecutor> children,
+ Set<Class<? extends ConfigurableThreadPoolExecutor>>
excludeChildren,
+ MimeTypes mimeTypes, ServiceLoader loader)
+ throws InvocationTargetException, IllegalAccessException,
+ InstantiationException {
+ throw new InstantiationException("Only one executor service
supported");
+ }
+
+ @Override
+ ConfigurableThreadPoolExecutor
createComposite(List<ConfigurableThreadPoolExecutor> loaded,
+ MimeTypes mimeTypes, ServiceLoader loader) {
+ return loaded.get(0);
+ }
+
+ @Override
+ ConfigurableThreadPoolExecutor createDefault(MimeTypes mimeTypes,
ServiceLoader loader) {
+ return getDefaultExecutorService();
+ }
+
+ @Override
+ ConfigurableThreadPoolExecutor decorate(ConfigurableThreadPoolExecutor
created, Element element)
+ throws IOException, TikaException {
+ Element coreThreadElement = getChild(element, "core-threads");
+ if(coreThreadElement != null)
+ {
+
created.setCorePoolSize(Integer.parseInt(getText(coreThreadElement)));
+ }
+ Element maxThreadElement = getChild(element, "max-threads");
+ if(maxThreadElement != null)
+ {
+
created.setMaximumPoolSize(Integer.parseInt(getText(maxThreadElement)));
+ }
+ return created;
+ }
+
+ @Override
+ Class<? extends ConfigurableThreadPoolExecutor> getLoaderClass() {
+ return ConfigurableThreadPoolExecutor.class;
+ }
+
+ @Override
+ ConfigurableThreadPoolExecutor loadOne(Element element, MimeTypes
mimeTypes,
+ ServiceLoader loader) throws TikaException, IOException {
+ return super.loadOne(element, mimeTypes, loader);
+ }
+
+ @Override
+ boolean supportsComposite() {return false;}
+
+ @Override
+ String getParentTagName() {return null;}
+
+ @Override
+ String getLoaderTagName() {return "executor-service";}
+
+ @Override
+ boolean isComposite(ConfigurableThreadPoolExecutor loaded) {return
false;}
+
+ @Override
+ boolean isComposite(Class<? extends ConfigurableThreadPoolExecutor>
loadedClass) {return false;}
+
+ @Override
+ ConfigurableThreadPoolExecutor preLoadOne(
+ Class<? extends ConfigurableThreadPoolExecutor> loadedClass,
String classname,
+ MimeTypes mimeTypes) throws TikaException {
+ return null;
+ }
+ }
}
Added:
tika/trunk/tika-core/src/main/java/org/apache/tika/utils/ConcurrentUtils.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/utils/ConcurrentUtils.java?rev=1708723&view=auto
==============================================================================
---
tika/trunk/tika-core/src/main/java/org/apache/tika/utils/ConcurrentUtils.java
(added)
+++
tika/trunk/tika-core/src/main/java/org/apache/tika/utils/ConcurrentUtils.java
Thu Oct 15 02:08:04 2015
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+import org.apache.tika.parser.ParseContext;
+
+/**
+ * Utility Class for Concurrency in Tika
+ *
+ * @since Apache Tika 1.11
+ */
+public class ConcurrentUtils {
+
+ /**
+ *
+ * Execute a runnable using an ExecutorService from the ParseContext if
possible.
+ * Otherwise fallback to individual threads.
+ *
+ * @param context
+ * @param runnable
+ * @return
+ */
+ public static Future execute(ParseContext context, Runnable runnable) {
+
+ Future future = null;
+ ExecutorService executorService = context.get(ExecutorService.class);
+ if(executorService == null) {
+ FutureTask task = new FutureTask<>(runnable, null);
+ Thread thread = new Thread(task, "Tika Thread");
+ thread.start();
+ future = task;
+ }
+ else {
+ future = executorService.submit(runnable);
+ }
+
+ return future;
+ }
+}
Added:
tika/trunk/tika-core/src/test/java/org/apache/tika/config/DummyExecutor.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/java/org/apache/tika/config/DummyExecutor.java?rev=1708723&view=auto
==============================================================================
---
tika/trunk/tika-core/src/test/java/org/apache/tika/config/DummyExecutor.java
(added)
+++
tika/trunk/tika-core/src/test/java/org/apache/tika/config/DummyExecutor.java
Thu Oct 15 02:08:04 2015
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.config;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.tika.concurrent.ConfigurableThreadPoolExecutor;
+
+public class DummyExecutor extends ThreadPoolExecutor implements
ConfigurableThreadPoolExecutor {
+ public DummyExecutor()
+ {
+ super(1,1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ }
+}
Modified:
tika/trunk/tika-core/src/test/java/org/apache/tika/config/TikaConfigTest.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/java/org/apache/tika/config/TikaConfigTest.java?rev=1708723&r1=1708722&r2=1708723&view=diff
==============================================================================
---
tika/trunk/tika-core/src/test/java/org/apache/tika/config/TikaConfigTest.java
(original)
+++
tika/trunk/tika-core/src/test/java/org/apache/tika/config/TikaConfigTest.java
Thu Oct 15 02:08:04 2015
@@ -21,8 +21,12 @@ import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.tika.ResourceLoggingClassLoader;
+import org.apache.tika.config.DummyExecutor;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.config.TikaConfigTest;
import org.apache.tika.exception.TikaException;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.CompositeParser;
@@ -243,4 +247,17 @@ public class TikaConfigTest extends Abst
assertTrue("Dynamic Service Loading Should be true", dynamicValue);
}
+
+ @Test
+ public void testTikaExecutorServiceFromConfig() throws Exception {
+ URL url = TikaConfigTest.class.getResource("TIKA-1762-executors.xml");
+
+ TikaConfig config = new TikaConfig(url);
+
+ ThreadPoolExecutor executorService =
(ThreadPoolExecutor)config.getExecutorService();
+
+ assertTrue("Should use Dummy Executor", (executorService instanceof
DummyExecutor));
+ assertEquals("Should have configured Core Threads", 3,
executorService.getCorePoolSize());
+ assertEquals("Should have configured Max Threads", 10,
executorService.getMaximumPoolSize());
+ }
}
\ No newline at end of file
Added:
tika/trunk/tika-core/src/test/java/org/apache/tika/utils/ConcurrentUtilsTest.java
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/java/org/apache/tika/utils/ConcurrentUtilsTest.java?rev=1708723&view=auto
==============================================================================
---
tika/trunk/tika-core/src/test/java/org/apache/tika/utils/ConcurrentUtilsTest.java
(added)
+++
tika/trunk/tika-core/src/test/java/org/apache/tika/utils/ConcurrentUtilsTest.java
Thu Oct 15 02:08:04 2015
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.utils;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.parser.ParseContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConcurrentUtilsTest {
+
+ @Test
+ public void testExecuteThread() throws Exception {
+ ParseContext context = new ParseContext();
+ Future result = ConcurrentUtils.execute(context, new Runnable() {
+
+ @Override
+ public void run() {
+ //Do nothing
+
+ }
+ });
+
+ assertNull(result.get());
+ }
+
+ @Test
+ public void testExecuteExecutor() throws Exception {
+ TikaConfig config = TikaConfig.getDefaultConfig();
+ ParseContext context = new ParseContext();
+ context.set(ExecutorService.class, config.getExecutorService());
+ Future result = ConcurrentUtils.execute(context, new Runnable() {
+
+ @Override
+ public void run() {
+ //Do nothing
+
+ }
+ });
+
+ assertNull(result.get());
+ }
+
+}
Added:
tika/trunk/tika-core/src/test/resources/org/apache/tika/config/TIKA-1762-executors.xml
URL:
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/resources/org/apache/tika/config/TIKA-1762-executors.xml?rev=1708723&view=auto
==============================================================================
---
tika/trunk/tika-core/src/test/resources/org/apache/tika/config/TIKA-1762-executors.xml
(added)
+++
tika/trunk/tika-core/src/test/resources/org/apache/tika/config/TIKA-1762-executors.xml
Thu Oct 15 02:08:04 2015
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<properties>
+ <service-loader dynamic="true"/>
+ <parsers>
+ <parser class="org.apache.tika.config.DummyParser"/>
+ </parsers>
+ <executor-service class="org.apache.tika.config.DummyExecutor">
+ <core-threads>3</core-threads>
+ <max-threads>10</max-threads>
+ </executor-service>
+</properties>