Author: bob
Date: Thu Oct 15 02:08:04 2015
New Revision: 1708723

URL: http://svn.apache.org/viewvc?rev=1708723&view=rev
Log:
TIKA-1762 - Create a Configurable ExecutorService in TikaConfig

Added:
    tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/
    
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/ConfigurableThreadPoolExecutor.java
    
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/SimpleThreadPoolExecutor.java
    
tika/trunk/tika-core/src/main/java/org/apache/tika/utils/ConcurrentUtils.java
    tika/trunk/tika-core/src/test/java/org/apache/tika/config/DummyExecutor.java
    
tika/trunk/tika-core/src/test/java/org/apache/tika/utils/ConcurrentUtilsTest.java
    
tika/trunk/tika-core/src/test/resources/org/apache/tika/config/TIKA-1762-executors.xml
Modified:
    tika/trunk/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
    
tika/trunk/tika-core/src/test/java/org/apache/tika/config/TikaConfigTest.java

Added: 
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/ConfigurableThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/ConfigurableThreadPoolExecutor.java?rev=1708723&view=auto
==============================================================================
--- 
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/ConfigurableThreadPoolExecutor.java
 (added)
+++ 
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/ConfigurableThreadPoolExecutor.java
 Thu Oct 15 02:08:04 2015
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Allows Thread Pool to be Configurable.
+ *
+ * @since Apache Tika 1.11
+ */
+public interface ConfigurableThreadPoolExecutor extends ExecutorService {
+    
+    public void setMaximumPoolSize(int threads);
+    
+    public void setCorePoolSize(int threads);
+
+}

Added: 
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/SimpleThreadPoolExecutor.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/SimpleThreadPoolExecutor.java?rev=1708723&view=auto
==============================================================================
--- 
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/SimpleThreadPoolExecutor.java
 (added)
+++ 
tika/trunk/tika-core/src/main/java/org/apache/tika/concurrent/SimpleThreadPoolExecutor.java
 Thu Oct 15 02:08:04 2015
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.concurrent;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Simple Thread Pool Executor
+ *
+ * @since Apache Tika 1.11
+ */
+public class SimpleThreadPoolExecutor extends ThreadPoolExecutor implements 
ConfigurableThreadPoolExecutor {
+
+    public SimpleThreadPoolExecutor() {
+        super(1, 2, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), 
new ThreadFactory() {
+            
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "Tika Executor Thread");
+            }
+        });
+    }
+}

Modified: 
tika/trunk/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java?rev=1708723&r1=1708722&r2=1708723&view=diff
==============================================================================
--- tika/trunk/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java 
(original)
+++ tika/trunk/tika-core/src/main/java/org/apache/tika/config/TikaConfig.java 
Thu Oct 15 02:08:04 2015
@@ -29,12 +29,15 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 import javax.imageio.spi.ServiceRegistry;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
+import org.apache.tika.concurrent.ConfigurableThreadPoolExecutor;
+import org.apache.tika.concurrent.SimpleThreadPoolExecutor;
 import org.apache.tika.detect.CompositeDetector;
 import org.apache.tika.detect.DefaultDetector;
 import org.apache.tika.detect.Detector;
@@ -79,12 +82,18 @@ public class TikaConfig {
     private static Translator getDefaultTranslator(ServiceLoader loader) {
         return new DefaultTranslator(loader);
     }
+
+    private static ConfigurableThreadPoolExecutor getDefaultExecutorService() {
+        return new SimpleThreadPoolExecutor();
+    }
+
     private final ServiceLoader serviceLoader;
     private final CompositeParser parser;
     private final CompositeDetector detector;
     private final Translator translator;
 
     private final MimeTypes mimeTypes;
+    private final ExecutorService executorService;
 
     public TikaConfig(String file)
             throws TikaException, IOException, SAXException {
@@ -139,11 +148,13 @@ public class TikaConfig {
         ParserXmlLoader parserLoader = new ParserXmlLoader();
         DetectorXmlLoader detectorLoader = new DetectorXmlLoader();
         TranslatorXmlLoader translatorLoader = new TranslatorXmlLoader();
+        ExecutorServiceXmlLoader executorLoader = new 
ExecutorServiceXmlLoader();
         
         this.mimeTypes = typesFromDomElement(element);
         this.detector = detectorLoader.loadOverall(element, mimeTypes, loader);
         this.parser = parserLoader.loadOverall(element, mimeTypes, loader);
         this.translator = translatorLoader.loadOverall(element, mimeTypes, 
loader);
+        this.executorService = executorLoader.loadOverall(element, mimeTypes, 
loader);
         this.serviceLoader = loader;
     }
 
@@ -166,6 +177,7 @@ public class TikaConfig {
         this.detector = getDefaultDetector(mimeTypes, serviceLoader);
         this.parser = getDefaultParser(mimeTypes, serviceLoader);
         this.translator = getDefaultTranslator(serviceLoader);
+        this.executorService = getDefaultExecutorService();
     }
 
     /**
@@ -198,6 +210,7 @@ public class TikaConfig {
             this.parser = getDefaultParser(mimeTypes, serviceLoader);
             this.detector = getDefaultDetector(mimeTypes, serviceLoader);
             this.translator = getDefaultTranslator(serviceLoader);
+            this.executorService = getDefaultExecutorService();
         } else {
             // Locate the given configuration file
             InputStream stream = null;
@@ -224,11 +237,13 @@ public class TikaConfig {
                 ParserXmlLoader parserLoader = new ParserXmlLoader();
                 DetectorXmlLoader detectorLoader = new DetectorXmlLoader();
                 TranslatorXmlLoader translatorLoader = new 
TranslatorXmlLoader();
+                ExecutorServiceXmlLoader executorLoader = new 
ExecutorServiceXmlLoader();
                 
                 this.mimeTypes = typesFromDomElement(element);
                 this.parser = parserLoader.loadOverall(element, mimeTypes, 
serviceLoader);
                 this.detector = detectorLoader.loadOverall(element, mimeTypes, 
serviceLoader);
                 this.translator = translatorLoader.loadOverall(element, 
mimeTypes, serviceLoader);
+                this.executorService = executorLoader.loadOverall(element, 
mimeTypes, serviceLoader);
             } catch (SAXException e) {
                 throw new TikaException(
                         "Specified Tika configuration has syntax errors: "
@@ -287,6 +302,10 @@ public class TikaConfig {
     public Translator getTranslator() {
         return translator;
     }
+    
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
 
     public MimeTypes getMimeRepository(){
         return mimeTypes;
@@ -788,4 +807,77 @@ public class TikaConfig {
             return created; // No decoration of Translators
         }        
     }
+    
+    private static class ExecutorServiceXmlLoader extends 
XmlLoader<ConfigurableThreadPoolExecutor,ConfigurableThreadPoolExecutor> {
+        @Override
+        ConfigurableThreadPoolExecutor createComposite(
+                Class<? extends ConfigurableThreadPoolExecutor> compositeClass,
+                List<ConfigurableThreadPoolExecutor> children,
+                Set<Class<? extends ConfigurableThreadPoolExecutor>> 
excludeChildren,
+                MimeTypes mimeTypes, ServiceLoader loader)
+                throws InvocationTargetException, IllegalAccessException,
+                InstantiationException {
+            throw new InstantiationException("Only one executor service 
supported");
+        }
+        
+        @Override
+        ConfigurableThreadPoolExecutor 
createComposite(List<ConfigurableThreadPoolExecutor> loaded,
+                MimeTypes mimeTypes, ServiceLoader loader) {
+            return loaded.get(0);
+        }
+        
+        @Override
+        ConfigurableThreadPoolExecutor createDefault(MimeTypes mimeTypes, 
ServiceLoader loader) {
+            return getDefaultExecutorService();
+        }
+        
+        @Override
+        ConfigurableThreadPoolExecutor decorate(ConfigurableThreadPoolExecutor 
created, Element element)
+                throws IOException, TikaException {
+            Element coreThreadElement = getChild(element, "core-threads");
+            if(coreThreadElement != null)
+            {
+                
created.setCorePoolSize(Integer.parseInt(getText(coreThreadElement)));
+            }
+            Element maxThreadElement = getChild(element, "max-threads");
+            if(maxThreadElement != null)
+            {
+                
created.setMaximumPoolSize(Integer.parseInt(getText(maxThreadElement)));
+            }
+            return created;
+        }
+        
+        @Override
+        Class<? extends ConfigurableThreadPoolExecutor> getLoaderClass() {
+            return ConfigurableThreadPoolExecutor.class;
+        }
+        
+        @Override
+        ConfigurableThreadPoolExecutor loadOne(Element element, MimeTypes 
mimeTypes,
+                ServiceLoader loader) throws TikaException, IOException {
+            return super.loadOne(element, mimeTypes, loader);
+        }
+
+        @Override
+        boolean supportsComposite() {return false;}
+
+        @Override
+        String getParentTagName() {return null;}
+
+        @Override
+        String getLoaderTagName() {return "executor-service";}
+
+        @Override
+        boolean isComposite(ConfigurableThreadPoolExecutor loaded) {return 
false;}
+
+        @Override
+        boolean isComposite(Class<? extends ConfigurableThreadPoolExecutor> 
loadedClass) {return false;}
+
+        @Override
+        ConfigurableThreadPoolExecutor preLoadOne(
+                Class<? extends ConfigurableThreadPoolExecutor> loadedClass, 
String classname,
+                MimeTypes mimeTypes) throws TikaException {
+            return null;
+        }
+    }
 }

Added: 
tika/trunk/tika-core/src/main/java/org/apache/tika/utils/ConcurrentUtils.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/main/java/org/apache/tika/utils/ConcurrentUtils.java?rev=1708723&view=auto
==============================================================================
--- 
tika/trunk/tika-core/src/main/java/org/apache/tika/utils/ConcurrentUtils.java 
(added)
+++ 
tika/trunk/tika-core/src/main/java/org/apache/tika/utils/ConcurrentUtils.java 
Thu Oct 15 02:08:04 2015
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+
+import org.apache.tika.parser.ParseContext;
+
+/**
+ * Utility Class for Concurrency in Tika
+ *
+ * @since Apache Tika 1.11
+ */
+public class ConcurrentUtils {
+    
+    /**
+     * 
+     * Execute a runnable using an ExecutorService from the ParseContext if 
possible.
+     * Otherwise fallback to individual threads.
+     * 
+     * @param context
+     * @param runnable
+     * @return
+     */
+    public static Future execute(ParseContext context, Runnable runnable) {
+        
+        Future future = null;
+        ExecutorService executorService = context.get(ExecutorService.class);
+        if(executorService == null) {
+            FutureTask task = new FutureTask<>(runnable, null);
+            Thread thread = new Thread(task, "Tika Thread");
+            thread.start();
+            future = task;
+        }
+        else {
+            future = executorService.submit(runnable);
+        }
+        
+        return future;
+    }
+}

Added: 
tika/trunk/tika-core/src/test/java/org/apache/tika/config/DummyExecutor.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/java/org/apache/tika/config/DummyExecutor.java?rev=1708723&view=auto
==============================================================================
--- 
tika/trunk/tika-core/src/test/java/org/apache/tika/config/DummyExecutor.java 
(added)
+++ 
tika/trunk/tika-core/src/test/java/org/apache/tika/config/DummyExecutor.java 
Thu Oct 15 02:08:04 2015
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.config;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.tika.concurrent.ConfigurableThreadPoolExecutor;
+
+public class DummyExecutor extends ThreadPoolExecutor implements 
ConfigurableThreadPoolExecutor {
+    public DummyExecutor() 
+    {
+        super(1,1, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+    }
+}

Modified: 
tika/trunk/tika-core/src/test/java/org/apache/tika/config/TikaConfigTest.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/java/org/apache/tika/config/TikaConfigTest.java?rev=1708723&r1=1708722&r2=1708723&view=diff
==============================================================================
--- 
tika/trunk/tika-core/src/test/java/org/apache/tika/config/TikaConfigTest.java 
(original)
+++ 
tika/trunk/tika-core/src/test/java/org/apache/tika/config/TikaConfigTest.java 
Thu Oct 15 02:08:04 2015
@@ -21,8 +21,12 @@ import java.net.URI;
 import java.net.URL;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.tika.ResourceLoggingClassLoader;
+import org.apache.tika.config.DummyExecutor;
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.config.TikaConfigTest;
 import org.apache.tika.exception.TikaException;
 import org.apache.tika.parser.AutoDetectParser;
 import org.apache.tika.parser.CompositeParser;
@@ -243,4 +247,17 @@ public class TikaConfigTest extends Abst
         
         assertTrue("Dynamic Service Loading Should be true", dynamicValue);
     }
+    
+    @Test
+    public void testTikaExecutorServiceFromConfig() throws Exception {
+        URL url = TikaConfigTest.class.getResource("TIKA-1762-executors.xml");
+        
+        TikaConfig config = new TikaConfig(url);
+        
+        ThreadPoolExecutor executorService = 
(ThreadPoolExecutor)config.getExecutorService();
+        
+        assertTrue("Should use Dummy Executor", (executorService instanceof 
DummyExecutor));
+        assertEquals("Should have configured Core Threads", 3, 
executorService.getCorePoolSize());
+        assertEquals("Should have configured Max Threads", 10, 
executorService.getMaximumPoolSize());
+    }
 }
\ No newline at end of file

Added: 
tika/trunk/tika-core/src/test/java/org/apache/tika/utils/ConcurrentUtilsTest.java
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/java/org/apache/tika/utils/ConcurrentUtilsTest.java?rev=1708723&view=auto
==============================================================================
--- 
tika/trunk/tika-core/src/test/java/org/apache/tika/utils/ConcurrentUtilsTest.java
 (added)
+++ 
tika/trunk/tika-core/src/test/java/org/apache/tika/utils/ConcurrentUtilsTest.java
 Thu Oct 15 02:08:04 2015
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tika.utils;
+
+import static org.junit.Assert.*;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.tika.config.TikaConfig;
+import org.apache.tika.parser.ParseContext;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ConcurrentUtilsTest {
+
+    @Test
+    public void testExecuteThread() throws Exception {
+        ParseContext context = new ParseContext();
+        Future result = ConcurrentUtils.execute(context, new Runnable() {
+            
+            @Override
+            public void run() {
+                //Do nothing
+                
+            }
+        });
+        
+        assertNull(result.get());
+    }
+    
+    @Test
+    public void testExecuteExecutor() throws Exception {
+        TikaConfig config = TikaConfig.getDefaultConfig();
+        ParseContext context = new ParseContext();
+        context.set(ExecutorService.class, config.getExecutorService());
+        Future result = ConcurrentUtils.execute(context, new Runnable() {
+            
+            @Override
+            public void run() {
+                //Do nothing
+                
+            }
+        });
+        
+        assertNull(result.get());
+    }
+
+}

Added: 
tika/trunk/tika-core/src/test/resources/org/apache/tika/config/TIKA-1762-executors.xml
URL: 
http://svn.apache.org/viewvc/tika/trunk/tika-core/src/test/resources/org/apache/tika/config/TIKA-1762-executors.xml?rev=1708723&view=auto
==============================================================================
--- 
tika/trunk/tika-core/src/test/resources/org/apache/tika/config/TIKA-1762-executors.xml
 (added)
+++ 
tika/trunk/tika-core/src/test/resources/org/apache/tika/config/TIKA-1762-executors.xml
 Thu Oct 15 02:08:04 2015
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<properties>
+  <service-loader dynamic="true"/>
+  <parsers>
+    <parser class="org.apache.tika.config.DummyParser"/>
+  </parsers>
+  <executor-service class="org.apache.tika.config.DummyExecutor">
+    <core-threads>3</core-threads>
+    <max-threads>10</max-threads>
+  </executor-service>
+</properties>


Reply via email to