Repository: incubator-blur Updated Branches: refs/heads/master 0eaefea45 -> 9ae5bf35b
Adding first cut of stream server and client along with a start to a spark rdd for blur. Disabled by default, considered experiemental at this point. Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/9ae5bf35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/9ae5bf35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/9ae5bf35 Branch: refs/heads/master Commit: 9ae5bf35b5eea0015e456873e8288961da70a9aa Parents: 0eaefea Author: Aaron McCurry <amccu...@gmail.com> Authored: Mon Sep 28 10:45:06 2015 -0400 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Mon Sep 28 10:45:06 2015 -0400 ---------------------------------------------------------------------- .../blur/command/stream/StreamClient.java | 180 ++++++++++++ .../blur/command/stream/StreamCommand.java | 30 ++ .../blur/command/stream/StreamComplete.java | 25 ++ .../apache/blur/command/stream/StreamError.java | 53 ++++ .../blur/command/stream/StreamFunction.java | 27 ++ .../blur/command/stream/StreamIndexContext.java | 86 ++++++ .../blur/command/stream/StreamProcessor.java | 225 +++++++++++++++ .../blur/command/stream/StreamServer.java | 241 ++++++++++++++++ .../apache/blur/command/stream/StreamSplit.java | 64 +++++ .../apache/blur/command/stream/StreamUtil.java | 56 ++++ .../blur/command/stream/StreamWriter.java | 27 ++ .../blur/thrift/ThriftBlurShardServer.java | 24 +- .../blur/command/stream/StreamServerTest.java | 285 +++++++++++++++++++ blur-spark/pom.xml | 10 +- .../java/org/apache/blur/spark/BlurRDD.java | 175 ++++++++++++ .../org/apache/blur/spark/BlurSparkSplit.java | 51 ++++ .../org/apache/blur/spark/BlurSparkUtil.java | 168 +++++++++++ .../org/apache/blur/spark/UsingBlurRDD.java | 83 ++++++ blur-stream-client/pom.xml | 116 -------- blur-stream-server/pom.xml | 116 -------- .../org/apache/blur/utils/BlurConstants.java | 2 + .../src/main/resources/blur-default.properties | 3 + 22 files changed, 1806 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java new file mode 100644 index 0000000..20d1bcd --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamClient.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.net.Socket; +import java.util.Arrays; +import java.util.Iterator; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.apache.blur.log.Log; +import org.apache.blur.log.LogFactory; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.ByteArrayOutputStream; + +import com.google.common.io.Closer; + +public class StreamClient implements Closeable { + + private static final Log LOG = LogFactory.getLog(StreamClient.class); + + private final String _host; + private final int _port; + private final Socket _socket; + private final DataInputStream _dataInputStream; + private final DataOutputStream _dataOutputStream; + private final Closer _closer = Closer.create(); + + public StreamClient(String host, int port, int timeout) throws IOException { + _host = host; + _port = port; + _socket = _closer.register(new Socket(_host, _port)); + _socket.setTcpNoDelay(true); + _socket.setSoTimeout(timeout); + _dataInputStream = new DataInputStream(_closer.register(_socket.getInputStream())); + _dataOutputStream = new DataOutputStream(_closer.register(_socket.getOutputStream())); + } + + public String getHost() { + return _host; + } + + public int getPort() { + return _port; + } + + public DataInputStream getDataInputStream() { + return _dataInputStream; + } + + public DataOutputStream getDataOutStream() { + return _dataOutputStream; + } + + @Override + public void close() throws IOException { + _closer.close(); + } + + public boolean isClassLoaderAvailable(String classLoaderId) throws IOException { + _dataOutputStream.write(StreamCommand.CLASS_LOAD_CHECK.getCommand()); + StreamUtil.writeString(_dataOutputStream, classLoaderId); + _dataOutputStream.flush(); + String message = StreamUtil.readString(_dataInputStream); + if (message.equals("OK")) { + return true; + } else { + return false; + } + } + + public void loadJars(String classLoaderId, Iterable<String> testJars) throws IOException { + _dataOutputStream.write(StreamCommand.CLASS_LOAD.getCommand()); + StreamUtil.writeString(_dataOutputStream, classLoaderId); + sendZip(testJars, _dataOutputStream); + _dataOutputStream.flush(); + String message = StreamUtil.readString(_dataInputStream); + if (!message.equals("LOADED")) { + throw new IOException("Unknown error during load, check logs on server."); + } + } + + private void sendZip(Iterable<String> testJars, DataOutputStream outputStream) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ZipOutputStream zipOutputStream = new ZipOutputStream(out); + for (String jar : testJars) { + LOG.info("Sending jar [{0}]", jar); + File file = new File(jar); + FileInputStream in = new FileInputStream(file); + zipOutputStream.putNextEntry(new ZipEntry(file.getName())); + IOUtils.copy(in, zipOutputStream); + in.close(); + zipOutputStream.closeEntry(); + } + zipOutputStream.close(); + byte[] bs = out.toByteArray(); + outputStream.writeInt(bs.length); + outputStream.write(bs); + } + + public void loadJars(String classLoaderId, String... testJars) throws IOException { + loadJars(classLoaderId, Arrays.asList(testJars)); + } + + public <T> Iterable<T> executeStream(StreamSplit streamSplit, StreamFunction<T> streamFunction) throws IOException { + return new Iterable<T>() { + @Override + public Iterator<T> iterator() { + try { + _dataOutputStream.write(StreamCommand.STREAM.getCommand()); + byte[] streamSplitBytes = StreamUtil.toBytes(streamSplit.copy()); + byte[] streamFunctionBytes = StreamUtil.toBytes(streamFunction); + _dataOutputStream.writeInt(streamSplitBytes.length); + _dataOutputStream.write(streamSplitBytes); + _dataOutputStream.writeInt(streamFunctionBytes.length); + _dataOutputStream.write(streamFunctionBytes); + _dataOutputStream.flush(); + ObjectInputStream objectInputStream = new ObjectInputStream(_dataInputStream); + return new Iterator<T>() { + + private boolean _more = true; + private Object _obj; + + @Override + public boolean hasNext() { + if (!_more) { + return false; + } + if (_obj != null) { + return true; + } + Object o; + try { + o = objectInputStream.readObject(); + } catch (ClassNotFoundException | IOException e) { + throw new RuntimeException(e); + } + if (o instanceof StreamComplete) { + return _more = false; + } + _obj = o; + return true; + } + + @SuppressWarnings("unchecked") + @Override + public T next() { + T o = (T) _obj; + _obj = null; + return o; + } + }; + } catch (IOException e) { + throw new RuntimeException("Unknown error.", e); + } + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamCommand.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamCommand.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamCommand.java new file mode 100644 index 0000000..95563dc --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamCommand.java @@ -0,0 +1,30 @@ +package org.apache.blur.command.stream; + +public enum StreamCommand { + STREAM(1), CLASS_LOAD_CHECK(2), CLASS_LOAD(3), CLOSE(-1); + + private final int _command; + + private StreamCommand(int command) { + _command = command; + } + + public int getCommand() { + return _command; + } + + public static StreamCommand find(int command) { + switch (command) { + case -1: + return CLOSE; + case 1: + return STREAM; + case 2: + return CLASS_LOAD_CHECK; + case 3: + return CLASS_LOAD; + default: + throw new RuntimeException("Command [" + command + "] not found."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamComplete.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamComplete.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamComplete.java new file mode 100644 index 0000000..95f2706 --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamComplete.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import java.io.Serializable; + +public class StreamComplete implements Serializable { + + private static final long serialVersionUID = 4117959719555234581L; + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java new file mode 100644 index 0000000..468cd56 --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamError.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import java.io.PrintWriter; +import java.io.Serializable; +import java.io.StringWriter; + +public class StreamError implements Serializable { + + private static final long serialVersionUID = 5624998869726795714L; + + private final String _message; + private final String _stack; + + public StreamError(Exception e) { + _message = e.getMessage(); + Throwable cause = e.getCause(); + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + cause.printStackTrace(pw); + pw.close(); + _stack = sw.toString(); + } + + public String getMessage() { + return _message; + } + + public String getStack() { + return _stack; + } + + @Override + public String toString() { + return "StreamError [_message=" + _message + ", _stack=" + _stack + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamFunction.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamFunction.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamFunction.java new file mode 100644 index 0000000..7434261 --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamFunction.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import java.io.Serializable; + +import org.apache.blur.command.IndexContext; + +public interface StreamFunction<T> extends Serializable { + + void call(IndexContext indexContext, StreamWriter<T> writer) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamIndexContext.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamIndexContext.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamIndexContext.java new file mode 100644 index 0000000..6bcc20c --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamIndexContext.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.blur.BlurConfiguration; +import org.apache.blur.command.IndexContext; +import org.apache.blur.command.Shard; +import org.apache.blur.lucene.search.IndexSearcherCloseable; +import org.apache.blur.manager.writer.BlurIndex; +import org.apache.blur.server.ShardContext; +import org.apache.blur.server.TableContext; +import org.apache.lucene.index.IndexReader; + +public class StreamIndexContext extends IndexContext implements Closeable { + + private final ShardContext _shardContext; + private final TableContext _tableContext; + private final IndexSearcherCloseable _indexSearcher; + private final IndexReader _indexReader; + private final Shard _shard; + + public StreamIndexContext(BlurIndex blurIndex) throws IOException { + _shardContext = blurIndex.getShardContext(); + _tableContext = _shardContext.getTableContext(); + _indexSearcher = blurIndex.getIndexSearcher(); + _indexReader = _indexSearcher.getIndexReader(); + _shard = new Shard(_tableContext.getTable(), _shardContext.getShard()); + } + + @Override + public TableContext getTableContext(String table) throws IOException { + throw new RuntimeException("Not supported."); + } + + @Override + public BlurConfiguration getBlurConfiguration(String table) throws IOException { + throw new RuntimeException("Not supported."); + } + + @Override + public TableContext getTableContext() throws IOException { + return _tableContext; + } + + @Override + public Shard getShard() { + return _shard; + } + + @Override + public IndexSearcherCloseable getIndexSearcher() { + return _indexSearcher; + } + + @Override + public IndexReader getIndexReader() { + return _indexReader; + } + + @Override + public BlurConfiguration getBlurConfiguration() throws IOException { + return _tableContext.getBlurConfiguration(); + } + + @Override + public void close() throws IOException { + _indexSearcher.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java new file mode 100644 index 0000000..754ef40 --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamProcessor.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; +import java.io.OutputStream; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import org.apache.blur.command.IndexContext; +import org.apache.blur.log.Log; +import org.apache.blur.log.LogFactory; +import org.apache.blur.manager.IndexServer; +import org.apache.blur.manager.writer.BlurIndex; +import org.apache.commons.io.IOUtils; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; + +public class StreamProcessor { + + private static final Log LOG = LogFactory.getLog(StreamProcessor.class); + + private final IndexServer _indexServer; + private final Map<String, ClassLoader> _classLoaderMap; + private final File _tmpFile; + + public StreamProcessor(IndexServer indexServer, File tmpFile) { + _indexServer = indexServer; + _classLoaderMap = CacheBuilder.newBuilder().concurrencyLevel(4).maximumSize(128) + .expireAfterAccess(45, TimeUnit.SECONDS).removalListener(new RemovalListener<String, ClassLoader>() { + @Override + public void onRemoval(RemovalNotification<String, ClassLoader> notification) { + String key = notification.getKey(); + LOG.info("Unloading classLoaderId [{0}]", key); + File file = new File(_tmpFile, key); + if (!rmr(file)) { + LOG.error("Could not remove file [{0}]", file); + } + } + }).build().asMap(); + + _tmpFile = tmpFile; + } + + protected boolean rmr(File file) { + boolean success = true; + if (file.exists()) { + if (file.isDirectory()) { + for (File f : file.listFiles()) { + if (!rmr(f)) { + success = false; + } + } + } + if (!file.delete()) { + success = false; + } + } + return success; + } + + public StreamIndexContext getIndexContext(final String table, final String shard) throws IOException { + Map<String, BlurIndex> indexes = _indexServer.getIndexes(table); + if (indexes == null) { + throw new IOException("Table [" + table + "] is not being served by this server."); + } + BlurIndex blurIndex = indexes.get(shard); + if (blurIndex == null) { + throw new IOException("Shard [" + shard + "] for table [" + table + "] is not being served by this server."); + } + return new StreamIndexContext(blurIndex); + } + + public <T> void execute(StreamFunction<T> function, OutputStream outputStream, IndexContext indexContext) + throws IOException { + final ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream); + StreamWriter<T> writer = getWriter(objectOutputStream); + try { + function.call(indexContext, writer); + } catch (Exception e) { + LOG.error("Unknown error.", e); + objectOutputStream.writeObject(new StreamError(e)); + } finally { + objectOutputStream.writeObject(new StreamComplete()); + objectOutputStream.close(); + } + } + + private <T> StreamWriter<T> getWriter(final ObjectOutputStream objectOutputStream) { + final WriteLock writeLock = new ReentrantReadWriteLock(true).writeLock(); + return new StreamWriter<T>() { + @Override + public void write(T obj) throws IOException { + writeLock.lock(); + try { + objectOutputStream.writeObject(obj); + } finally { + writeLock.unlock(); + } + } + + @Override + public void write(Iterable<T> it) throws IOException { + writeLock.lock(); + try { + for (T t : it) { + objectOutputStream.writeObject(t); + } + } finally { + writeLock.unlock(); + } + } + }; + } + + public StreamFunction<?> getStreamFunction(String classLoaderId, InputStream inputStream) throws IOException { + final ClassLoader classLoader = getClassLoader(classLoaderId); + ObjectInputStream objectInputStream = new ObjectInputStream(inputStream) { + @Override + protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { + return classLoader.loadClass(desc.getName()); + } + }; + try { + return (StreamFunction<?>) objectInputStream.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } finally { + objectInputStream.close(); + } + } + + private ClassLoader getClassLoader(String classLoaderId) throws IOException { + ClassLoader classLoader = _classLoaderMap.get(classLoaderId); + if (classLoader == null) { + throw new IOException("ClassLoaderId [" + classLoaderId + "] not found."); + } + return classLoader; + } + + public synchronized void loadClassLoader(String classLoaderId, DataInputStream inputStream) throws IOException { + if (_classLoaderMap.containsKey(classLoaderId)) { + // read input and discard + int length = inputStream.readInt(); + byte[] buf = new byte[length]; + inputStream.readFully(buf); + return; + } + + LOG.info("Class Loader [{0}] Starting", classLoaderId); + File copyJarsLocally = copyJarsLocally(classLoaderId, inputStream); + List<URL> urls = new ArrayList<URL>(); + for (File f : copyJarsLocally.listFiles()) { + URL url = f.toURI().toURL(); + LOG.info("Class Loader [{0}] Loading [{1}]", classLoaderId, url); + urls.add(url); + } + URLClassLoader classLoader = new URLClassLoader(urls.toArray(new URL[] {})); + _classLoaderMap.put(classLoaderId, classLoader); + LOG.info("Class Loader [{0}] Complete", classLoaderId); + } + + private File copyJarsLocally(String classLoaderId, DataInputStream inputStream) throws IOException, + FileNotFoundException { + int length = inputStream.readInt(); + byte[] buf = new byte[length]; + inputStream.readFully(buf); + ZipInputStream zipInputStream = new ZipInputStream(new ByteArrayInputStream(buf)); + try { + ZipEntry zipEntry; + File dir = new File(_tmpFile, classLoaderId); + dir.mkdirs(); + while ((zipEntry = zipInputStream.getNextEntry()) != null) { + if (zipEntry.isDirectory()) { + throw new IOException("Dirs in delivery zip are not supported."); + } + String name = zipEntry.getName(); + File file = new File(dir, name); + FileOutputStream output = new FileOutputStream(file); + IOUtils.copy(zipInputStream, output); + output.close(); + } + return dir; + } finally { + zipInputStream.close(); + } + } + + public boolean isClassLoaderLoaded(String id) { + return _classLoaderMap.containsKey(id); + } +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java new file mode 100644 index 0000000..82bf6a2 --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamServer.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.blur.concurrent.Executors; +import org.apache.blur.log.Log; +import org.apache.blur.log.LogFactory; +import org.apache.blur.user.User; +import org.apache.blur.user.UserContext; +import org.apache.commons.io.IOUtils; + +import com.google.common.io.Closer; + +public class StreamServer implements Closeable { + + private static final Log LOG = LogFactory.getLog(StreamServer.class); + + private final int _port; + private final int _threadCount; + private final StreamProcessor _streamProcessor; + private final Closer _closer = Closer.create(); + private final AtomicBoolean _running = new AtomicBoolean(true); + + private ServerSocket _serverSocket; + private ExecutorService _service; + private Thread _thread; + private int _runningPort; + + public StreamServer(int port, int threadCount, StreamProcessor streamProcessor) { + _port = port; + _threadCount = threadCount; + _streamProcessor = streamProcessor; + } + + @Override + public void close() throws IOException { + _closer.close(); + } + + public void start() throws IOException { + _service = Executors.newThreadPool("stream-server", _threadCount); + _serverSocket = new ServerSocket(_port, 1000); + _runningPort = _serverSocket.getLocalPort(); + _thread = new Thread(new Runnable() { + + @Override + public void run() { + while (_running.get()) { + try { + handleSocket(_serverSocket.accept()); + } catch (IOException e) { + LOG.error("Unknown error.", e); + } + } + } + }); + _closer.register(new Closeable() { + @Override + public void close() throws IOException { + _running.set(false); + _thread.interrupt(); + } + }); + _thread.setName("stream-server-main"); + _thread.setDaemon(true); + _thread.start(); + } + + protected void handleSocket(Socket socket) { + _service.submit(new SocketHandler(socket, _streamProcessor)); + } + + private static class SocketHandler implements Runnable { + + private final Socket _socket; + private final Closer _closer = Closer.create(); + private final StreamProcessor _streamProcessor; + + public SocketHandler(Socket socket, StreamProcessor streamProcessor) { + _socket = _closer.register(socket); + _streamProcessor = streamProcessor; + } + + @Override + public void run() { + InputStream inputStream; + OutputStream outputStream; + try { + inputStream = _closer.register(_socket.getInputStream()); + outputStream = _closer.register(_socket.getOutputStream()); + while (true) { + int read = inputStream.read(); + StreamCommand command = StreamCommand.find(read); + switch (command) { + case STREAM: { + executeStream(_streamProcessor, inputStream, outputStream); + break; + } + case CLASS_LOAD: { + executeClassLoad(_streamProcessor, inputStream, outputStream); + break; + } + case CLASS_LOAD_CHECK: { + checkClassLoad(_streamProcessor, inputStream, outputStream); + break; + } + case CLOSE: { + return; + } + default: + throw new RuntimeException("Command [" + command + "] not supported."); + } + } + } catch (Throwable t) { + if (t instanceof SocketException) { + if (t.getMessage().trim().toLowerCase().equals("socket closed")) { + return; + } + } + LOG.error("Unknown error.", t); + } finally { + try { + _closer.close(); + } catch (IOException e) { + LOG.error("Unknown Error"); + } + } + } + } + + public static void executeStream(StreamProcessor streamProcessor, InputStream in, OutputStream outputStream) + throws IOException { + DataInputStream inputStream = new DataInputStream(in); + byte[] streamSplitBytes = getObjectBytes(inputStream); + byte[] functionBytes = getObjectBytes(inputStream); + StreamSplit streamSplit = getStreamSplit(toInputStream(streamSplitBytes)); + String table = streamSplit.getTable(); + String shard = streamSplit.getShard(); + String classLoaderId = streamSplit.getClassLoaderId(); + User user = new User(streamSplit.getUser(), streamSplit.getUserAttributes()); + UserContext.setUser(user); + StreamIndexContext indexContext = null; + try { + indexContext = streamProcessor.getIndexContext(table, shard); + StreamFunction<?> function = streamProcessor.getStreamFunction(classLoaderId, toInputStream(functionBytes)); + streamProcessor.execute(function, outputStream, indexContext); + } finally { + IOUtils.closeQuietly(indexContext); + UserContext.reset(); + } + } + + public static void executeClassLoad(StreamProcessor streamProcessor, InputStream inputStream, + OutputStream outputStream) throws IOException { + DataInputStream in = new DataInputStream(inputStream); + DataOutputStream out = new DataOutputStream(outputStream); + + int length = in.readInt(); + byte[] buf = new byte[length]; + in.readFully(buf); + String id = new String(buf); + streamProcessor.loadClassLoader(id, in); + byte[] bs = "LOADED".getBytes(); + out.writeInt(bs.length); + out.write(bs); + } + + public static void checkClassLoad(StreamProcessor streamProcessor, InputStream inputStream, OutputStream outputStream) + throws IOException { + DataInputStream in = new DataInputStream(inputStream); + DataOutputStream out = new DataOutputStream(outputStream); + + int length = in.readInt(); + byte[] buf = new byte[length]; + in.readFully(buf); + String id = new String(buf); + byte[] bs; + if (streamProcessor.isClassLoaderLoaded(id)) { + bs = "OK".getBytes(); + } else { + bs = "NOT FOUND".getBytes(); + } + out.writeInt(bs.length); + out.write(bs); + } + + private static InputStream toInputStream(byte[] bs) { + return new ByteArrayInputStream(bs); + } + + private static byte[] getObjectBytes(DataInputStream inputStream) throws IOException { + int length = inputStream.readInt(); + byte[] buf = new byte[length]; + inputStream.readFully(buf); + return buf; + } + + private static StreamSplit getStreamSplit(InputStream inputStream) throws IOException { + ObjectInputStream objectInputStream = new ObjectInputStream(inputStream); + try { + return (StreamSplit) objectInputStream.readObject(); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } finally { + objectInputStream.close(); + } + } + + public int getPort() { + return _runningPort; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamSplit.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamSplit.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamSplit.java new file mode 100644 index 0000000..d45f473 --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamSplit.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import java.io.Serializable; +import java.util.Map; + +public class StreamSplit implements Serializable { + + private static final long serialVersionUID = -1760098859541747672L; + + private final String table; + private final String shard; + private final String classLoaderId; + private final String user; + private final Map<String, String> userAttributes; + + public StreamSplit(String table, String shard, String classLoaderId, String user, Map<String, String> userAttributes) { + this.table = table; + this.shard = shard; + this.classLoaderId = classLoaderId; + this.user = user; + this.userAttributes = userAttributes; + } + + public String getTable() { + return table; + } + + public String getShard() { + return shard; + } + + public String getClassLoaderId() { + return classLoaderId; + } + + public String getUser() { + return user; + } + + public Map<String, String> getUserAttributes() { + return userAttributes; + } + + public StreamSplit copy() { + return new StreamSplit(table, shard, classLoaderId, user, userAttributes); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java new file mode 100644 index 0000000..e4811b2 --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamUtil.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; + +import org.apache.commons.io.output.ByteArrayOutputStream; + +public class StreamUtil { + + private static final String UTF_8 = "UTF-8"; + + public static void writeString(DataOutput output, String s) throws IOException { + try { + byte[] bs = s.getBytes(UTF_8); + output.writeInt(bs.length); + output.write(bs); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("Does not supported UTF-8?", e); + } + } + + public static String readString(DataInput input) throws IOException { + int length = input.readInt(); + byte[] buf = new byte[length]; + input.readFully(buf); + return new String(buf, UTF_8); + } + + public static byte[] toBytes(Serializable s) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(outputStream); + out.writeObject(s); + out.close(); + return outputStream.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/command/stream/StreamWriter.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/command/stream/StreamWriter.java b/blur-core/src/main/java/org/apache/blur/command/stream/StreamWriter.java new file mode 100644 index 0000000..8a0d728 --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/command/stream/StreamWriter.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import java.io.IOException; + +public interface StreamWriter<T> { + + void write(T obj) throws IOException; + + void write(Iterable<T> it) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java index 15d861c..00e2b22 100644 --- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java +++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java @@ -51,6 +51,8 @@ import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_SMALL_MERGE_THRESHO import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_THRIFT_ACCEPT_QUEUE_SIZE_PER_THREAD; import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_THRIFT_MAX_READ_BUFFER_BYTES; import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_THRIFT_SELECTOR_THREADS; +import static org.apache.blur.utils.BlurConstants.BLUR_STREAM_SERVER_RUNNING_PORT; +import static org.apache.blur.utils.BlurConstants.BLUR_STREAM_SERVER_THREADS; import static org.apache.blur.utils.BlurConstants.BLUR_THRIFT_DEFAULT_MAX_FRAME_SIZE; import static org.apache.blur.utils.BlurConstants.BLUR_THRIFT_MAX_FRAME_SIZE; import static org.apache.blur.utils.BlurUtil.quietClose; @@ -66,6 +68,8 @@ import java.util.concurrent.TimeUnit; import org.apache.blur.BlurConfiguration; import org.apache.blur.command.ShardCommandManager; +import org.apache.blur.command.stream.StreamProcessor; +import org.apache.blur.command.stream.StreamServer; import org.apache.blur.concurrent.SimpleUncaughtExceptionHandler; import org.apache.blur.concurrent.ThreadWatcher; import org.apache.blur.gui.HttpJettyServer; @@ -193,7 +197,7 @@ public class ThriftBlurShardServer extends ThriftServer { BlurQueryChecker queryChecker = new BlurQueryChecker(configuration); String cluster = configuration.get(BLUR_CLUSTER_NAME, BLUR_CLUSTER); - + final ZooKeeper zooKeeper = setupZookeeper(configuration, cluster); final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(zooKeeper, configuration, config); @@ -301,6 +305,18 @@ public class ThriftBlurShardServer extends ThriftServer { instanceGuiPort = 0; } + StreamServer streamServer; + int streamThreadCount = configuration.getInt(BLUR_STREAM_SERVER_THREADS, 100); + if (streamThreadCount > 0) { + StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpPath); + streamServer = new StreamServer(0, streamThreadCount, streamProcessor); + streamServer.start(); + configuration.setInt(BLUR_STREAM_SERVER_RUNNING_PORT, streamServer.getPort()); + LOG.info("Stream server started on port [{0}]", streamServer.getPort()); + } else { + streamServer = null; + } + final HttpJettyServer httpServer; if (configGuiPort >= 0) { httpServer = new HttpJettyServer(HttpJettyServer.class, instanceGuiPort); @@ -339,9 +355,9 @@ public class ThriftBlurShardServer extends ThriftServer { @Override public void shutdown() { ThreadWatcher threadWatcher = ThreadWatcher.instance(); - quietClose(makeCloseable(hdfsKeyValueTimer), makeCloseable(indexImporterTimer), makeCloseable(indexBulkTimer), - blockCacheDirectoryFactory, commandManager, traceStorage, server, shardServer, indexManager, indexServer, - threadWatcher, clusterStatus, zooKeeper, httpServer); + quietClose(streamServer, makeCloseable(hdfsKeyValueTimer), makeCloseable(indexImporterTimer), + makeCloseable(indexBulkTimer), blockCacheDirectoryFactory, commandManager, traceStorage, server, + shardServer, indexManager, indexServer, threadWatcher, clusterStatus, zooKeeper, httpServer); } }; server.setShutdown(shutdown); http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java ---------------------------------------------------------------------- diff --git a/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java b/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java new file mode 100644 index 0000000..da582a3 --- /dev/null +++ b/blur-core/src/test/java/org/apache/blur/command/stream/StreamServerTest.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.command.stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.blur.command.IndexContext; +import org.apache.blur.lucene.search.IndexSearcherCloseable; +import org.apache.blur.manager.IndexServer; +import org.apache.blur.manager.writer.BlurIndex; +import org.apache.blur.manager.writer.IndexAction; +import org.apache.blur.server.ShardContext; +import org.apache.blur.server.TableContext; +import org.apache.blur.thrift.generated.RowMutation; +import org.apache.blur.thrift.generated.ShardState; +import org.apache.blur.thrift.generated.TableDescriptor; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.similarities.Similarity; +import org.junit.Test; + +import com.google.common.base.Splitter; +import com.google.common.io.Closer; + +@SuppressWarnings("serial") +public class StreamServerTest implements Serializable { + + @Test + public void testServer() throws IOException { + Closer closer = Closer.create(); + try { + File tmpFile = new File("./target/tmp/StreamServerTest"); + tmpFile.mkdirs(); + IndexServer indexServer = new TestIndexServer(); + StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpFile); + int timeout = 3000000; + String classLoaderId = UUID.randomUUID().toString(); + + StreamServer server = closer.register(new StreamServer(0, 100, streamProcessor)); + server.start(); + int port = server.getPort(); + StreamClient client = closer.register(new StreamClient("localhost", port, timeout)); + assertFalse(client.isClassLoaderAvailable(classLoaderId)); + client.loadJars(classLoaderId, getTestJar()); + + String table = "test"; + String shard = "shard"; + String user = "test"; + Map<String, String> userAttributes = new HashMap<String, String>(); + StreamSplit split = new StreamSplit(table, shard, classLoaderId, user, userAttributes); + Iterable<String> it = client.executeStream(split, new StreamFunction<String>() { + @Override + public void call(IndexContext indexContext, StreamWriter<String> writer) throws Exception { + writer.write("test"); + } + }); + Iterator<String> iterator = it.iterator(); + assertTrue(iterator.hasNext()); + assertEquals("test", iterator.next()); + assertFalse(iterator.hasNext()); + + } finally { + closer.close(); + } + } + + private String getTestJar() { + String property = System.getProperty("java.class.path"); + Splitter splitter = Splitter.on(':'); + for (String s : splitter.split(property)) { + if (s.endsWith(".jar")) { + return s; + } + } + throw new RuntimeException("No jars found?"); + } + + public static class TestIndexServer implements IndexServer { + + @Override + public SortedSet<String> getShardListCurrentServerOnly(String table) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public Map<String, BlurIndex> getIndexes(String table) throws IOException { + Map<String, BlurIndex> map = new HashMap<String, BlurIndex>(); + BlurIndex value = getBlurIndex(); + map.put("shard", value); + return map; + } + + @Override + public String getNodeName() { + throw new RuntimeException("Not implemented."); + } + + @Override + public long getRecordCount(String table) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public long getRowCount(String table) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public long getTableSize(String table) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public void close() throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public Map<String, ShardState> getShardState(String table) { + throw new RuntimeException("Not implemented."); + } + + @Override + public long getSegmentImportInProgressCount(String table) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public long getSegmentImportPendingCount(String table) throws IOException { + throw new RuntimeException("Not implemented."); + } + + } + + public static BlurIndex getBlurIndex() throws IOException { + String shard = "shard"; + TableDescriptor tableDescriptor = new TableDescriptor(); + tableDescriptor.setName("test"); + tableDescriptor.setTableUri("file:///tmp"); + TableContext tableContext = TableContext.create(tableDescriptor); + ShardContext shardContext = ShardContext.create(tableContext, shard); + return new BlurIndex(shardContext, null, null, null, null, null, null, null) { + + @Override + public void removeSnapshot(String name) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public void refresh() throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public void process(IndexAction indexAction) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public void optimize(int numberOfSegmentsPerShard) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public AtomicBoolean isClosed() { + throw new RuntimeException("Not implemented."); + } + + @Override + public List<String> getSnapshots() throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public long getSegmentImportPendingCount() throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public long getSegmentImportInProgressCount() throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public long getOnDiskSize() throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public IndexSearcherCloseable getIndexSearcher() throws IOException { + return getIndexSearcherCloseable(); + } + + @Override + public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public void enqueue(List<RowMutation> mutations) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public void createSnapshot(String name) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public void close() throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public void addBulkMutate(String bulkId, RowMutation mutation) throws IOException { + throw new RuntimeException("Not implemented."); + } + }; + } + + protected static IndexSearcherCloseable getIndexSearcherCloseable() { + return new IndexSearcherCloseable() { + + @Override + public void close() throws IOException { + + } + + @Override + public void setSimilarity(Similarity similarity) { + throw new RuntimeException("Not implemented."); + } + + @Override + public void search(Query query, Collector collector) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public TopDocs search(Query query, int i) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public Query rewrite(Query query) throws IOException { + throw new RuntimeException("Not implemented."); + } + + @Override + public IndexReader getIndexReader() { + return null; + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-spark/pom.xml ---------------------------------------------------------------------- diff --git a/blur-spark/pom.xml b/blur-spark/pom.xml index 377db61..ac5a41a 100644 --- a/blur-spark/pom.xml +++ b/blur-spark/pom.xml @@ -27,15 +27,15 @@ <dependencies> <dependency> - <groupId>org.apache.blur</groupId> - <artifactId>blur-core</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> + <dependency> + <groupId>org.apache.blur</groupId> + <artifactId>blur-core</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-spark/src/main/java/org/apache/blur/spark/BlurRDD.java ---------------------------------------------------------------------- diff --git a/blur-spark/src/main/java/org/apache/blur/spark/BlurRDD.java b/blur-spark/src/main/java/org/apache/blur/spark/BlurRDD.java new file mode 100644 index 0000000..ad743d6 --- /dev/null +++ b/blur-spark/src/main/java/org/apache/blur/spark/BlurRDD.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.spark; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; + +import org.apache.blur.command.stream.StreamClient; +import org.apache.blur.command.stream.StreamFunction; +import org.apache.blur.thirdparty.thrift_0_9_0.TException; +import org.apache.blur.thrift.BlurClient; +import org.apache.blur.thrift.generated.Blur.Iface; +import org.apache.blur.thrift.generated.BlurException; +import org.apache.blur.user.User; +import org.apache.blur.user.UserContext; +import org.apache.blur.utils.BlurConstants; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; + +import com.google.common.base.Splitter; +import com.google.common.io.Closer; + +@SuppressWarnings("serial") +public class BlurRDD implements Serializable { + + private final static String CLASS_LOADER_ID = UUID.randomUUID().toString(); + + private final transient Iface _client; + private final transient SparkConf _sparkConf; + private final List<String> _jars = new ArrayList<String>(); + private final int _timeout = 60000; + + public BlurRDD(String zooKeeperConnectionStr, SparkConf sparkConf) throws IOException { + this(BlurClient.getClientFromZooKeeperConnectionStr(zooKeeperConnectionStr), sparkConf); + } + + public BlurRDD(Iface client, SparkConf sparkConf) throws IOException { + _client = client; + _sparkConf = sparkConf; + if (_sparkConf.contains("spark.jars")) { + String jars = _sparkConf.get("spark.jars"); + for (String jar : Splitter.on(',').split(jars)) { + _jars.add(jar); + } + } + } + + public <T> JavaRDD<T> executeStream(JavaSparkContext context, String table, StreamFunction<T> streamFunction) { + User user = UserContext.getUser(); + List<BlurSparkSplit> splits = getSplits(table, user, CLASS_LOADER_ID); + return context.parallelize(splits).flatMap(new FlatMapFunction<BlurSparkSplit, T>() { + @Override + public Iterable<T> call(BlurSparkSplit t) throws Exception { + return new Iterable<T>() { + @Override + public Iterator<T> iterator() { + Closer closer = Closer.create(); + try { + String host = t.getHost(); + int port = t.getPort(); + int timeout = t.getTimeout(); + StreamClient streamClient = closer.register(new StreamClient(host, port, timeout)); + String classLoaderId = t.getClassLoaderId(); + if (!streamClient.isClassLoaderAvailable(classLoaderId)) { + streamClient.loadJars(classLoaderId, _jars); + } + return wrapClose(closer, streamClient.executeStream(t, streamFunction).iterator()); + } catch (IOException e) { + IOUtils.closeQuietly(closer); + throw new RuntimeException(e); + } + } + }; + } + }); + } + + private static <T> Iterator<T> wrapClose(Closeable c, Iterator<T> t) { + return new Iterator<T>() { + + @Override + public boolean hasNext() { + try { + boolean hasNext = t.hasNext(); + if (!hasNext) { + IOUtils.closeQuietly(c); + } + return hasNext; + } catch (Throwable t) { + IOUtils.closeQuietly(c); + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } + } + + @Override + public T next() { + return t.next(); + } + }; + } + + private List<BlurSparkSplit> getSplits(String table, User user, String classLoaderId) { + try { + Map<String, String> shardServerLayout = _client.shardServerLayout(table); + List<BlurSparkSplit> splits = new ArrayList<BlurSparkSplit>(); + for (Entry<String, String> e : shardServerLayout.entrySet()) { + String shard = e.getKey(); + String shardServerWithThriftPort = e.getValue(); + String host = getHost(shardServerWithThriftPort); + int port = getStreamPort(shardServerWithThriftPort); + String username = user.getUsername(); + Map<String, String> attributes = user.getAttributes(); + splits.add(new BlurSparkSplit(host, port, _timeout, table, shard, classLoaderId, username, attributes)); + } + return splits; + } catch (BlurException e) { + throw new RuntimeException(e); + } catch (TException e) { + throw new RuntimeException(e); + } + } + + private String getHost(String shardServerWithThriftPort) { + return shardServerWithThriftPort.substring(0, shardServerWithThriftPort.indexOf(':')); + } + + private int getStreamPort(String shardServerWithThriftPort) throws BlurException, TException { + String port = _client.configurationPerServer(shardServerWithThriftPort, + BlurConstants.BLUR_STREAM_SERVER_RUNNING_PORT); + return Integer.parseInt(port); + } + + public static byte[] toBytes(Serializable s) { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try { + ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream); + objectOutputStream.writeObject(s); + objectOutputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + return outputStream.toByteArray(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkSplit.java ---------------------------------------------------------------------- diff --git a/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkSplit.java b/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkSplit.java new file mode 100644 index 0000000..59fe968 --- /dev/null +++ b/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkSplit.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.spark; + +import java.util.Map; + +import org.apache.blur.command.stream.StreamSplit; + +public class BlurSparkSplit extends StreamSplit { + + private static final long serialVersionUID = -6636359986869398948L; + + private final String _host; + private final int _port; + private final int _timeout; + + public BlurSparkSplit(String host, int port, int timeout, String table, String shard, String classLoaderId, + String user, Map<String, String> userAttributes) { + super(table, shard, classLoaderId, user, userAttributes); + _host = host; + _port = port; + _timeout = timeout; + } + + public String getHost() { + return _host; + } + + public int getPort() { + return _port; + } + + public int getTimeout() { + return _timeout; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java ---------------------------------------------------------------------- diff --git a/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java b/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java new file mode 100644 index 0000000..a1f54fc --- /dev/null +++ b/blur-spark/src/main/java/org/apache/blur/spark/BlurSparkUtil.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.spark; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.HashSet; +import java.util.Set; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; + +import com.google.common.base.Splitter; + +public class BlurSparkUtil { + + private static final char JAR_END = '!'; + private static final String JAR_START = "jar:"; + private static final String TMP_SPARK_JOB = "tmp-spark-job_"; + private static final String JAR = ".jar"; + private static final String SEP = "/"; + private static final String PATH_SEPARATOR = "path.separator"; + private static final String JAVA_CLASS_PATH = "java.class.path"; + + public static void packJars(SparkConf conf, Class<?>... clazzes) throws IOException { + Set<String> classPathThatNeedsToBeIncluded = findJarFiles(clazzes); + Set<String> jars = new HashSet<String>(); + for (String s : classPathThatNeedsToBeIncluded) { + if (isJarFile(s)) { + jars.add(s); + } else { + jars.add(createJar(s)); + } + } + conf.setJars(jars.toArray(new String[jars.size()])); + } + + private static Set<String> findJarFiles(Class<?>[] clazzes) { + Set<String> result = new HashSet<String>(); + for (Class<?> c : clazzes) { + result.add(findJarFile(c)); + } + return result; + } + + private static String findJarFile(Class<?> c) { + String resourceName = "/" + c.getName().replace('.', '/') + ".class"; + URL url = BlurSparkUtil.class.getResource(resourceName); + try { + URI uri = url.toURI(); + if (uri.getScheme().equals("file")) { + return findFileInClassFileUri(uri); + } else if (uri.getScheme().equals("jar")) { + return findFileInJarFileUri(uri); + } else { + throw new RuntimeException("Unsupported schema [" + uri.getScheme() + "] for uri [" + uri + "]"); + } + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private static String findFileInClassFileUri(URI uri) { + String classPath = System.getProperty(JAVA_CLASS_PATH); + String pathSeparator = System.getProperty(PATH_SEPARATOR); + Splitter splitter = Splitter.on(pathSeparator); + Iterable<String> split = splitter.split(classPath); + String path = uri.getPath(); + for (String s : split) { + if (path.startsWith(s)) { + return new File(s).getAbsolutePath(); + } + } + throw new RuntimeException("Uri [" + uri + "] was not found on the classpath."); + } + + private static String findFileInJarFileUri(URI uri) throws URISyntaxException { + String s = uri.toString(); + int indexOf1 = s.indexOf(JAR_START); + int indexOf2 = s.indexOf(JAR_END); + return new File(new URI(s.substring(indexOf1 + JAR_START.length(), indexOf2))).getAbsolutePath(); + } + + private static String createJar(String s) throws IOException { + File sourceFile = new File(s); + if (sourceFile.isDirectory()) { + File file = File.createTempFile(TMP_SPARK_JOB, JAR); + OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file)); + JarOutputStream jarOut = new JarOutputStream(outputStream); + for (File f : sourceFile.listFiles()) { + pack(sourceFile, f, jarOut); + } + jarOut.close(); + file.deleteOnExit(); + return file.getAbsolutePath(); + } + throw new RuntimeException("File [" + s + "] is not a directory."); + } + + private static void pack(File rootPath, File source, JarOutputStream target) throws IOException { + String name = getName(rootPath, source); + if (source.isDirectory()) { + if (!SEP.equals(name)) { + JarEntry entry = new JarEntry(name); + entry.setTime(source.lastModified()); + target.putNextEntry(entry); + target.closeEntry(); + } + for (File f : source.listFiles()) { + pack(rootPath, f, target); + } + } else { + JarEntry entry = new JarEntry(name); + entry.setTime(source.lastModified()); + target.putNextEntry(entry); + BufferedInputStream in = new BufferedInputStream(new FileInputStream(source)); + IOUtils.copy(in, target); + in.close(); + target.closeEntry(); + } + } + + private static String getName(File rootPath, File source) { + String rootStr = rootPath.toURI().toString(); + String sourceStr = source.toURI().toString(); + if (sourceStr.startsWith(rootStr)) { + String result = sourceStr.substring(rootStr.length()); + if (source.isDirectory() && !result.endsWith(SEP)) { + result += SEP; + } + return result; + } else { + throw new RuntimeException("Not sure what happened."); + } + } + + private static boolean isJarFile(String s) { + if (s.endsWith(JAR) || s.endsWith(".zip")) { + return true; + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-spark/src/main/java/org/apache/blur/spark/UsingBlurRDD.java ---------------------------------------------------------------------- diff --git a/blur-spark/src/main/java/org/apache/blur/spark/UsingBlurRDD.java b/blur-spark/src/main/java/org/apache/blur/spark/UsingBlurRDD.java new file mode 100644 index 0000000..a09db32 --- /dev/null +++ b/blur-spark/src/main/java/org/apache/blur/spark/UsingBlurRDD.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.spark; + +import java.io.IOException; + +import org.apache.blur.command.IndexContext; +import org.apache.blur.command.stream.StreamFunction; +import org.apache.blur.command.stream.StreamWriter; +import org.apache.blur.thrift.BlurClient; +import org.apache.blur.thrift.generated.Blur.Iface; +import org.apache.lucene.index.AtomicReader; +import org.apache.lucene.index.AtomicReaderContext; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.BytesRef; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +public class UsingBlurRDD { + + @SuppressWarnings("serial") + public static void main(String[] args) throws IOException { + SparkConf sparkConf = new SparkConf(); + sparkConf.setAppName("test"); + sparkConf.setMaster("local[2]"); + BlurSparkUtil.packJars(sparkConf, UsingBlurRDD.class); + JavaSparkContext context = new JavaSparkContext(sparkConf); + + Iface client = BlurClient.getClient("127.0.0.1:40020"); + BlurRDD blurRDD = new BlurRDD(client, sparkConf); + String table = "test1234"; + final String field = "fam0.col0"; + + for (int i = 0; i < 1; i++) { + long s = System.nanoTime(); + JavaRDD<String> rdd = blurRDD.executeStream(context, table, new StreamFunction<String>() { + @Override + public void call(IndexContext indexContext, StreamWriter<String> writer) throws Exception { + IndexReader indexReader = indexContext.getIndexReader(); + for (AtomicReaderContext atomicReaderContext : indexReader.leaves()) { + AtomicReader reader = atomicReaderContext.reader(); + Terms terms = reader.fields().terms(field); + if (terms != null) { + TermsEnum termsEnum = terms.iterator(null); + BytesRef ref; + while ((ref = termsEnum.next()) != null) { + writer.write(ref.utf8ToString()); + } + } + } + } + }); + long count = rdd.distinct().count(); + long e = System.nanoTime(); + + System.out.println(count + " " + (e - s) / 1000000.0 + " ms"); + + } + // Iterator<String> iterator = rdd.distinct().toLocalIterator(); + // while (iterator.hasNext()) { + // System.out.println(iterator.next()); + // } + context.close(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-stream-client/pom.xml ---------------------------------------------------------------------- diff --git a/blur-stream-client/pom.xml b/blur-stream-client/pom.xml deleted file mode 100644 index 9571f26..0000000 --- a/blur-stream-client/pom.xml +++ /dev/null @@ -1,116 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" ?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - you under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.blur</groupId> - <artifactId>blur</artifactId> - <version>0.3.0.incubating</version> - <relativePath>../pom.xml</relativePath> - </parent> - <groupId>org.apache.blur</groupId> - <artifactId>blur-stream-client</artifactId> - <version>${projectVersion}</version> - <packaging>jar</packaging> - <name>Blur Stream Client</name> - - <dependencies> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - <version>4.0.31.Final</version> - </dependency> - </dependencies> - - <repositories> - <repository> - <id>libdir</id> - <url>file://${basedir}/../lib</url> - </repository> - </repositories> - - <build> - <pluginManagement> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </pluginManagement> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <executions> - <execution> - <id>attach-sources</id> - <goals> - <goal>jar</goal> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-help-plugin</artifactId> - <version>2.2</version> - <executions> - <execution> - <phase>generate-resources</phase> - <goals> - <goal>effective-pom</goal> - </goals> - <configuration> - <output>${project.build.directory}/effective-pom.xml</output> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-install-plugin</artifactId> - <version>2.3.1</version> - <executions> - <execution> - <phase>install</phase> - <goals> - <goal>install-file</goal> - </goals> - <configuration> - <file>${project.build.directory}/${artifactId}-${project.version}.jar</file> - <pomFile>${project.build.directory}/effective-pom.xml</pomFile> - <!-- sources></sources --> - <!-- javadoc></javadoc --> - <groupId>${project.groupId}</groupId> - <artifactId>${project.artifactId}</artifactId> - <version>${project.version}</version> - <packaging>jar</packaging> - <!--classifier></classifier --> - <generatePom>true</generatePom> - <createChecksum>true</createChecksum> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-stream-server/pom.xml ---------------------------------------------------------------------- diff --git a/blur-stream-server/pom.xml b/blur-stream-server/pom.xml deleted file mode 100644 index 02e6d7a..0000000 --- a/blur-stream-server/pom.xml +++ /dev/null @@ -1,116 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" ?> -<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor - license agreements. See the NOTICE file distributed with this work for additional - information regarding copyright ownership. The ASF licenses this file to - you under the Apache License, Version 2.0 (the "License"); you may not use - this file except in compliance with the License. You may obtain a copy of - the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required - by applicable law or agreed to in writing, software distributed under the - License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS - OF ANY KIND, either express or implied. See the License for the specific - language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.blur</groupId> - <artifactId>blur</artifactId> - <version>0.3.0.incubating</version> - <relativePath>../pom.xml</relativePath> - </parent> - <groupId>org.apache.blur</groupId> - <artifactId>blur-stream-server</artifactId> - <version>${projectVersion}</version> - <packaging>jar</packaging> - <name>Blur Stream Server</name> - - <dependencies> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - <version>4.0.31.Final</version> - </dependency> - </dependencies> - - <repositories> - <repository> - <id>libdir</id> - <url>file://${basedir}/../lib</url> - </repository> - </repositories> - - <build> - <pluginManagement> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </pluginManagement> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <executions> - <execution> - <id>attach-sources</id> - <goals> - <goal>jar</goal> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-help-plugin</artifactId> - <version>2.2</version> - <executions> - <execution> - <phase>generate-resources</phase> - <goals> - <goal>effective-pom</goal> - </goals> - <configuration> - <output>${project.build.directory}/effective-pom.xml</output> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-install-plugin</artifactId> - <version>2.3.1</version> - <executions> - <execution> - <phase>install</phase> - <goals> - <goal>install-file</goal> - </goals> - <configuration> - <file>${project.build.directory}/${artifactId}-${project.version}.jar</file> - <pomFile>${project.build.directory}/effective-pom.xml</pomFile> - <!-- sources></sources --> - <!-- javadoc></javadoc --> - <groupId>${project.groupId}</groupId> - <artifactId>${project.artifactId}</artifactId> - <version>${project.version}</version> - <packaging>jar</packaging> - <!--classifier></classifier --> - <generatePom>true</generatePom> - <createChecksum>true</createChecksum> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java ---------------------------------------------------------------------- diff --git a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java index fa6b185..6616946 100644 --- a/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java +++ b/blur-util/src/main/java/org/apache/blur/utils/BlurConstants.java @@ -170,6 +170,8 @@ public class BlurConstants { public static final String BLUR_CLUSTER_NAME = "blur.cluster.name"; public static final String BLUR_CLUSTER; public static final String BLUR_HTTP_STATUS_RUNNING_PORT = "blur.http.status.running.port"; + public static final String BLUR_STREAM_SERVER_RUNNING_PORT = "blur.stream.server.running.port"; + public static final String BLUR_STREAM_SERVER_THREADS = "blur.stream.server.threads"; public static final String BLUR_SHARD_COMMAND_DRIVER_THREADS = "blur.shard.command.driver.threads"; public static final String BLUR_SHARD_COMMAND_WORKER_THREADS = "blur.shard.command.worker.threads"; http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/9ae5bf35/blur-util/src/main/resources/blur-default.properties ---------------------------------------------------------------------- diff --git a/blur-util/src/main/resources/blur-default.properties b/blur-util/src/main/resources/blur-default.properties index 19bdac8..a45d8e1 100644 --- a/blur-util/src/main/resources/blur-default.properties +++ b/blur-util/src/main/resources/blur-default.properties @@ -92,6 +92,9 @@ blur.shard.bind.address=0.0.0.0 # The default binding port of the shard server, 0 for random blur.shard.bind.port=40020 +# Experimental stream server. Set threads to positive number to enable. +blur.stream.server.threads=0 + # The number of command driver threads. blur.shard.command.driver.threads=16