Repository: incubator-blur Updated Branches: refs/heads/v2_command 64437cf0a -> c607b6678
Making some progress toward the next gen command api. Project: http://git-wip-us.apache.org/repos/asf/incubator-blur/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-blur/commit/4d8b0bc7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-blur/tree/4d8b0bc7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-blur/diff/4d8b0bc7 Branch: refs/heads/v2_command Commit: 4d8b0bc74425f53d715bf4c4feca66a3211a57ae Parents: 64437cf Author: Aaron McCurry <amccu...@gmail.com> Authored: Mon Dec 7 14:16:49 2015 -0500 Committer: Aaron McCurry <amccu...@gmail.com> Committed: Mon Dec 7 14:16:49 2015 -0500 ---------------------------------------------------------------------- .../apache/blur/server/FilteredBlurServer.java | 6 +++ .../apache/blur/server/ShardServerContext.java | 15 +++++- .../blur/server/ShardServerEventHandler.java | 2 +- .../blur/server/command/ServerCommand.java | 25 ++++++++++ .../server/command/ServerCommandManager.java | 23 +++++++++ .../blur/thrift/BlurControllerServer.java | 6 +++ .../org/apache/blur/thrift/BlurShardServer.java | 36 ++++++++++++++ .../server/cache/ThriftCacheServerTest.java | 6 +++ .../blur/thrift/util/ServerCommandExample.java | 52 ++++++++++++++++++++ 9 files changed, 169 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d8b0bc7/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java index ff40e5a..723d0c0 100644 --- a/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java +++ b/blur-core/src/main/java/org/apache/blur/server/FilteredBlurServer.java @@ -30,6 +30,7 @@ import org.apache.blur.thrift.generated.BlurQueryStatus; import org.apache.blur.thrift.generated.BlurResults; import org.apache.blur.thrift.generated.ColumnDefinition; import org.apache.blur.thrift.generated.CommandDescriptor; +import org.apache.blur.thrift.generated.CommandRequest; import org.apache.blur.thrift.generated.CommandStatus; import org.apache.blur.thrift.generated.CommandStatusState; import org.apache.blur.thrift.generated.FetchResult; @@ -327,4 +328,9 @@ public class FilteredBlurServer implements Iface { _iface.loadIndex(table, externalIndexPaths); } + @Override + public void executeCommand(CommandRequest commandRequest) throws TException { + _iface.executeCommand(commandRequest); + } + } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d8b0bc7/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java b/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java index 59b142d..2295856 100644 --- a/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java +++ b/blur-core/src/main/java/org/apache/blur/server/ShardServerContext.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.blur.log.Log; import org.apache.blur.log.LogFactory; import org.apache.blur.lucene.search.IndexSearcherCloseable; +import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol; import org.apache.blur.thirdparty.thrift_0_9_0.server.ServerContext; /** @@ -40,9 +41,13 @@ public class ShardServerContext extends BlurServerContext implements ServerConte private final static Map<Thread, ShardServerContext> _threadsToContext = new ConcurrentHashMap<Thread, ShardServerContext>(); private final Map<String, IndexSearcherCloseable> _indexSearcherMap = new ConcurrentHashMap<String, IndexSearcherCloseable>(); + private final TProtocol _input; + private final TProtocol _output; - public ShardServerContext(SocketAddress localSocketAddress, SocketAddress remoteSocketAddress) { + public ShardServerContext(SocketAddress localSocketAddress, SocketAddress remoteSocketAddress, TProtocol input, TProtocol output) { super(localSocketAddress, remoteSocketAddress); + _input = input; + _output = output; } /** @@ -145,4 +150,12 @@ public class ShardServerContext extends BlurServerContext implements ServerConte return table + "/" + shard; } + public TProtocol getInput() { + return _input; + } + + public TProtocol getOutput() { + return _output; + } + } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d8b0bc7/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java b/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java index 3886ce1..5046fda 100644 --- a/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java +++ b/blur-core/src/main/java/org/apache/blur/server/ShardServerEventHandler.java @@ -86,7 +86,7 @@ public class ShardServerEventHandler implements TServerEventHandler { } _connectionMeter.mark(); _connections.incrementAndGet(); - return new ShardServerContext(localSocketAddress, remoteSocketAddress); + return new ShardServerContext(localSocketAddress, remoteSocketAddress, input, output); } @Override http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d8b0bc7/blur-core/src/main/java/org/apache/blur/server/command/ServerCommand.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/server/command/ServerCommand.java b/blur-core/src/main/java/org/apache/blur/server/command/ServerCommand.java new file mode 100644 index 0000000..19290a1 --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/server/command/ServerCommand.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.server.command; + +import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol; + +public abstract class ServerCommand { + + public abstract void request(TProtocol input, TProtocol output) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d8b0bc7/blur-core/src/main/java/org/apache/blur/server/command/ServerCommandManager.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/server/command/ServerCommandManager.java b/blur-core/src/main/java/org/apache/blur/server/command/ServerCommandManager.java new file mode 100644 index 0000000..f421021 --- /dev/null +++ b/blur-core/src/main/java/org/apache/blur/server/command/ServerCommandManager.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.server.command; + +public class ServerCommandManager { + + + +} http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d8b0bc7/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java index b24bb03..2e5ca08 100644 --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java @@ -91,6 +91,7 @@ import org.apache.blur.thrift.generated.BlurResult; import org.apache.blur.thrift.generated.BlurResults; import org.apache.blur.thrift.generated.ColumnDefinition; import org.apache.blur.thrift.generated.CommandDescriptor; +import org.apache.blur.thrift.generated.CommandRequest; import org.apache.blur.thrift.generated.CommandStatus; import org.apache.blur.thrift.generated.CommandStatusState; import org.apache.blur.thrift.generated.ErrorType; @@ -1873,4 +1874,9 @@ public class BlurControllerServer extends TableAdmin implements Iface { throw new BException("Unknown error while trying to validate indexes for table [{0}]", e, table); } } + + @Override + public void executeCommand(CommandRequest commandRequest) throws TException { + throw new RuntimeException("Not supported."); + } } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d8b0bc7/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java ---------------------------------------------------------------------- diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java index 625ed26..4b72699 100644 --- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java +++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -50,7 +51,9 @@ import org.apache.blur.manager.writer.BlurIndex; import org.apache.blur.server.ShardServerContext; import org.apache.blur.server.TableContext; import org.apache.blur.server.TableContextFactory; +import org.apache.blur.server.command.ServerCommand; import org.apache.blur.thirdparty.thrift_0_9_0.TException; +import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol; import org.apache.blur.thrift.generated.Arguments; import org.apache.blur.thrift.generated.Blur.Iface; import org.apache.blur.thrift.generated.BlurException; @@ -58,6 +61,7 @@ import org.apache.blur.thrift.generated.BlurQuery; import org.apache.blur.thrift.generated.BlurQueryStatus; import org.apache.blur.thrift.generated.BlurResults; import org.apache.blur.thrift.generated.CommandDescriptor; +import org.apache.blur.thrift.generated.CommandRequest; import org.apache.blur.thrift.generated.CommandStatus; import org.apache.blur.thrift.generated.CommandStatusState; import org.apache.blur.thrift.generated.FetchResult; @@ -775,4 +779,36 @@ public class BlurShardServer extends TableAdmin implements Iface { throw new RuntimeException("Shard servers do not support this call."); } + @Override + public void executeCommand(CommandRequest commandRequest) throws TException { + ShardServerContext shardServerContext = ShardServerContext.getShardServerContext(); + TProtocol input = shardServerContext.getInput(); + TProtocol output = shardServerContext.getOutput(); + ServerCommand command = getServerCommand(commandRequest); + try { + command.request(input, output); + } catch (Exception e) { + if (e instanceof TException) { + throw (TException) e; + } + throw new RuntimeException(e); + } + } + + private ServerCommand getServerCommand(CommandRequest commandRequest) { + return new ServerCommand() { + @Override + public void request(TProtocol input, TProtocol output) throws Exception { + output.writeBool(true); + long count = input.readI64(); + // long count = 1000; + Random random = new Random(); + for (long l = 0; l < count; l++) { + output.writeByte((byte) random.nextInt()); + } + output.getTransport().flush(); + } + }; + } + } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d8b0bc7/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java ---------------------------------------------------------------------- diff --git a/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java b/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java index 73ef9ed..4f977bb 100644 --- a/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java +++ b/blur-core/src/test/java/org/apache/blur/server/cache/ThriftCacheServerTest.java @@ -40,6 +40,7 @@ import org.apache.blur.thrift.generated.BlurQueryStatus; import org.apache.blur.thrift.generated.BlurResults; import org.apache.blur.thrift.generated.ColumnDefinition; import org.apache.blur.thrift.generated.CommandDescriptor; +import org.apache.blur.thrift.generated.CommandRequest; import org.apache.blur.thrift.generated.CommandStatus; import org.apache.blur.thrift.generated.CommandStatusState; import org.apache.blur.thrift.generated.FetchResult; @@ -526,6 +527,11 @@ public class ThriftCacheServerTest { throw new RuntimeException("Not implemented."); } + @Override + public void executeCommand(CommandRequest commandRequest) throws TException { + throw new RuntimeException("Not implemented."); + } + }; } http://git-wip-us.apache.org/repos/asf/incubator-blur/blob/4d8b0bc7/blur-thrift/src/main/java/org/apache/blur/thrift/util/ServerCommandExample.java ---------------------------------------------------------------------- diff --git a/blur-thrift/src/main/java/org/apache/blur/thrift/util/ServerCommandExample.java b/blur-thrift/src/main/java/org/apache/blur/thrift/util/ServerCommandExample.java new file mode 100644 index 0000000..5dc4c86 --- /dev/null +++ b/blur-thrift/src/main/java/org/apache/blur/thrift/util/ServerCommandExample.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.blur.thrift.util; + +import java.io.IOException; + +import org.apache.blur.thirdparty.thrift_0_9_0.TException; +import org.apache.blur.thirdparty.thrift_0_9_0.protocol.TProtocol; +import org.apache.blur.thrift.BlurClientManager; +import org.apache.blur.thrift.Connection; +import org.apache.blur.thrift.generated.Blur.Client; +import org.apache.blur.thrift.generated.CommandRequest; + +public class ServerCommandExample { + + public static void main(String[] args) throws TException, IOException { + CommandRequest commandRequest = new CommandRequest(); + commandRequest.setName("cool"); + Client client = BlurClientManager.getClientPool().getClient(new Connection("localhost:40020")); + client.executeCommand(commandRequest); + + TProtocol input = client.getInputProtocol(); + if (input.readBool()) { + TProtocol output = client.getOutputProtocol(); + output.writeI64(1000); + output.getTransport().flush(); + } + + long t = 0; + for (long l = 0; l < 1000; l++) { + t+=input.readByte(); + } + System.out.println(t); + + System.out.println(client.tableList()); + } + +}