http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java new file mode 100644 index 0000000..fb1cec2 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import com.google.protobuf.ServiceException; + +public abstract class ServerCallable<T> { + protected InetSocketAddress addr; + protected long startTime; + protected long endTime; + protected Class<?> protocol; + protected boolean asyncMode; + protected boolean closeConn; + protected RpcConnectionPool connPool; + + public abstract T call(NettyClientBase client) throws Exception; + + public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, boolean asyncMode) { + this(connPool, addr, protocol, asyncMode, false); + } + + public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, + boolean asyncMode, boolean closeConn) { + this.connPool = connPool; + this.addr = addr; + this.protocol = protocol; + this.asyncMode = asyncMode; + this.closeConn = closeConn; + } + + public void beforeCall() { + this.startTime = System.currentTimeMillis(); + } + + public long getStartTime(){ + return startTime; + } + + public void afterCall() { + this.endTime = System.currentTimeMillis(); + } + + public long getEndTime(){ + return endTime; + } + + boolean abort = false; + public void abort() { + abort = true; + } + /** + * Run this instance with retries, timed waits, + * and refinds of missing regions. + * + * @param <T> the type of the return value + * @return an object of type T + * @throws com.google.protobuf.ServiceException if a remote or network exception occurs + */ + public T withRetries() throws ServiceException { + //TODO configurable + final long pause = 500; //ms + final int numRetries = 3; + List<Throwable> exceptions = new ArrayList<Throwable>(); + + for (int tries = 0; tries < numRetries; tries++) { + NettyClientBase client = null; + try { + beforeCall(); + if(addr != null) { + client = connPool.getConnection(addr, protocol, asyncMode); + } + return call(client); + } catch (IOException ioe) { + exceptions.add(ioe); + if(abort) { + throw new ServiceException(ioe.getMessage(), ioe); + } + if (tries == numRetries - 1) { + throw new ServiceException("Giving up after tries=" + tries, ioe); + } + } catch (Throwable t) { + throw new ServiceException(t); + } finally { + afterCall(); + if(closeConn) { + connPool.closeConnection(client); + } else { + connPool.releaseConnection(client); + } + } + try { + Thread.sleep(pause * (tries + 1)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ServiceException("Giving up after tries=" + tries, e); + } + } + return null; + } + + /** + * Run this instance against the server once. + * @param <T> the type of the return value + * @return an object of type T + * @throws java.io.IOException if a remote or network exception occurs + * @throws RuntimeException other unspecified error + */ + public T withoutRetries() throws IOException, RuntimeException { + NettyClientBase client = null; + try { + beforeCall(); + client = connPool.getConnection(addr, protocol, asyncMode); + return call(client); + } catch (Throwable t) { + Throwable t2 = translateException(t); + if (t2 instanceof IOException) { + throw (IOException)t2; + } else { + throw new RuntimeException(t2); + } + } finally { + afterCall(); + if(closeConn) { + connPool.closeConnection(client); + } else { + connPool.releaseConnection(client); + } + } + } + + private static Throwable translateException(Throwable t) throws IOException { + if (t instanceof UndeclaredThrowableException) { + t = t.getCause(); + } + if (t instanceof RemoteException && t.getCause() != null) { + t = t.getCause(); + } + return t; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/TajoServiceException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/TajoServiceException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/TajoServiceException.java new file mode 100644 index 0000000..113d181 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/TajoServiceException.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.ServiceException; +import org.apache.commons.lang.exception.ExceptionUtils; + +public class TajoServiceException extends ServiceException { + private String traceMessage; + private String protocol; + private String remoteAddress; + + public TajoServiceException(String message) { + super(message); + } + public TajoServiceException(String message, String traceMessage) { + super(message); + this.traceMessage = traceMessage; + } + + public TajoServiceException(String message, Throwable cause, String protocol, String remoteAddress) { + super(message, cause); + + this.protocol = protocol; + this.remoteAddress = remoteAddress; + } + + public String getTraceMessage() { + if(traceMessage == null && getCause() != null){ + this.traceMessage = ExceptionUtils.getStackTrace(getCause()); + } + return traceMessage; + } + + public String getProtocol() { + return protocol; + } + + public String getRemoteAddress() { + return remoteAddress; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/proto/DummyProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/DummyProtos.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/DummyProtos.proto new file mode 100644 index 0000000..f53f0d6 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/DummyProtos.proto @@ -0,0 +1,47 @@ +/* + * Copyright 2012 Database Lab., Korea Univ. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.rpc.test"; +option java_outer_classname = "DummyProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +message MulRequest1 { + required int32 x1 = 1; + required int32 x2 = 2; +} + +message MulRequest2 { + required int32 x1 = 1; + required int32 x2 = 2; +} + +message MulResponse { + required int32 result1 = 1; + required int32 result2 = 2; +} + +message InnerNode { + required string instr = 1; +} + +message InnerRequest { + repeated InnerNode nodes = 1; +} + +message InnerResponse { + repeated InnerNode nodes = 1; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/proto/RpcProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/RpcProtos.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/RpcProtos.proto new file mode 100644 index 0000000..69f43ed --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/RpcProtos.proto @@ -0,0 +1,32 @@ +/* + * Copyright 2012 Database Lab., Korea Univ. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.rpc"; +option java_outer_classname = "RpcProtos"; + +message RpcRequest { + required int32 id = 1; + required string method_name = 2; + optional bytes request_message = 3; +} + +message RpcResponse { + required int32 id = 1; + optional bytes response_message = 2; + optional string error_class = 3; + optional string error_message = 4; + optional string error_trace = 5; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto new file mode 100644 index 0000000..58640ea --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto @@ -0,0 +1,31 @@ +/* + * Copyright 2012 Database Lab., Korea Univ. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.rpc.test"; +option java_outer_classname = "DummyProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +import "TestProtos.proto"; + +service DummyProtocolService { + rpc sum (SumRequest) returns (SumResponse); + rpc echo (EchoMessage) returns (EchoMessage); + rpc getError (EchoMessage) returns (EchoMessage); + rpc getNull (EchoMessage) returns (EchoMessage); + rpc deley (EchoMessage) returns (EchoMessage); + rpc throwException (EchoMessage) returns (EchoMessage); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtos.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtos.proto new file mode 100644 index 0000000..5001c0e --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtos.proto @@ -0,0 +1,35 @@ +/* + * Copyright 2012 Database Lab., Korea Univ. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.rpc.test"; +option java_outer_classname = "TestProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +message EchoMessage { + required string message = 1; +} + +message SumRequest { + required int32 x1 = 1; + required int64 x2 = 2; + required double x3 = 3; + required float x4 = 4; +} + +message SumResponse { + required double result = 1; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/test/java/log4j.properties ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/log4j.properties b/tajo-rpc/tajo-rpc-protobuf/src/test/java/log4j.properties new file mode 100644 index 0000000..2c4d991 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/log4j.properties @@ -0,0 +1,25 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshhold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p: %c (%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java new file mode 100644 index 0000000..a974a65 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.RpcCallback; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.test.DummyProtocol; +import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface; +import org.apache.tajo.rpc.test.TestProtos.EchoMessage; +import org.apache.tajo.rpc.test.TestProtos.SumRequest; +import org.apache.tajo.rpc.test.TestProtos.SumResponse; +import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExternalResource; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; + +public class TestAsyncRpc { + private static Log LOG = LogFactory.getLog(TestAsyncRpc.class); + private static String MESSAGE = "TestAsyncRpc"; + + double sum; + String echo; + + AsyncRpcServer server; + AsyncRpcClient client; + Interface stub; + DummyProtocolAsyncImpl service; + int retries; + + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @interface SetupRpcConnection { + boolean setupRpcServer() default true; + boolean setupRpcClient() default true; + } + + @Rule + public ExternalResource resource = new ExternalResource() { + + private Description description; + + @Override + public Statement apply(Statement base, Description description) { + this.description = description; + return super.apply(base, description); + } + + @Override + protected void before() throws Throwable { + SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); + + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { + setUpRpcServer(); + } + + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { + setUpRpcClient(); + } + } + + @Override + protected void after() { + SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); + + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { + try { + tearDownRpcClient(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { + try { + tearDownRpcServer(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + } + + }; + + public void setUpRpcServer() throws Exception { + service = new DummyProtocolAsyncImpl(); + server = new AsyncRpcServer(DummyProtocol.class, + service, new InetSocketAddress("127.0.0.1", 0), 2); + server.start(); + } + + public void setUpRpcClient() throws Exception { + retries = 1; + + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey( + RpcUtils.getConnectAddress(server.getListenAddress()), + DummyProtocol.class, true); + client = new AsyncRpcClient(rpcConnectionKey, retries); + client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT); + stub = client.getStub(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + RpcChannelFactory.shutdownGracefully(); + } + + public void tearDownRpcServer() throws Exception { + if(server != null) { + server.shutdown(); + server = null; + } + } + + public void tearDownRpcClient() throws Exception { + if(client != null) { + client.close(); + client = null; + } + } + + boolean calledMarker = false; + + @Test + public void testRpc() throws Exception { + + SumRequest sumRequest = SumRequest.newBuilder() + .setX1(1) + .setX2(2) + .setX3(3.15d) + .setX4(2.0f).build(); + + stub.sum(null, sumRequest, new RpcCallback<SumResponse>() { + @Override + public void run(SumResponse parameter) { + sum = parameter.getResult(); + assertTrue(8.15d == sum); + } + }); + + + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() { + @Override + public void run(EchoMessage parameter) { + echo = parameter.getMessage(); + assertEquals(MESSAGE, echo); + calledMarker = true; + } + }; + stub.echo(null, echoMessage, callback); + Thread.sleep(1000); + assertTrue(calledMarker); + } + + private CountDownLatch testNullLatch; + + @Test + public void testGetNull() throws Exception { + testNullLatch = new CountDownLatch(1); + stub.getNull(null, null, new RpcCallback<EchoMessage>() { + @Override + public void run(EchoMessage parameter) { + assertNull(parameter); + LOG.info("testGetNull retrieved"); + testNullLatch.countDown(); + } + }); + assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(service.getNullCalled); + } + + @Test + public void testCallFuture() throws Exception { + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + stub.deley(null, echoMessage, future); + + assertFalse(future.isDone()); + assertEquals(future.get(), echoMessage); + assertTrue(future.isDone()); + } + + @Test + public void testCallFutureTimeout() throws Exception { + boolean timeout = false; + try { + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + stub.deley(null, echoMessage, future); + + assertFalse(future.isDone()); + future.get(1, TimeUnit.SECONDS); + } catch (TimeoutException te) { + timeout = true; + } + assertTrue(timeout); + } + + @Test + public void testCallFutureDisconnected() throws Exception { + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + + tearDownRpcServer(); + + stub.echo(future.getController(), echoMessage, future); + EchoMessage response = future.get(); + + assertNull(response); + assertTrue(future.getController().failed()); + assertTrue(future.getController().errorText() != null); + } + + @Test + public void testStubDisconnected() throws Exception { + + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + + if (server != null) { + server.shutdown(true); + server = null; + } + + stub = client.getStub(); + stub.echo(future.getController(), echoMessage, future); + EchoMessage response = future.get(); + + assertNull(response); + assertTrue(future.getController().failed()); + assertTrue(future.getController().errorText() != null); + } + + @Test + @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false) + public void testConnectionRetry() throws Exception { + retries = 10; + ServerSocket serverSocket = new ServerSocket(0); + final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); + serverSocket.close(); + service = new DummyProtocolAsyncImpl(); + + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + + //lazy startup + Thread serverThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + server = new AsyncRpcServer(DummyProtocol.class, + service, address, 2); + } catch (Exception e) { + fail(e.getMessage()); + } + server.start(); + } + }); + serverThread.start(); + + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true); + client = new AsyncRpcClient(rpcConnectionKey, retries); + assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + stub = client.getStub(); + stub.echo(future.getController(), echoMessage, future); + + assertFalse(future.isDone()); + assertEquals(echoMessage, future.get()); + assertTrue(future.isDone()); + } + + @Test + public void testConnectionFailure() throws Exception { + InetSocketAddress address = new InetSocketAddress("test", 0); + try { + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true); + NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries); + assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + } catch (Throwable throwable) { + fail(); + } + } + + @Test + @SetupRpcConnection(setupRpcClient=false) + public void testUnresolvedAddress() throws Exception { + String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey( + RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true); + client = new AsyncRpcClient(rpcConnectionKey, retries); + assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + Interface stub = client.getStub(); + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + stub.deley(null, echoMessage, future); + + assertFalse(future.isDone()); + assertEquals(future.get(), echoMessage); + assertTrue(future.isDone()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java new file mode 100644 index 0000000..10dd766 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import org.apache.tajo.rpc.test.DummyProtocol; +import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface; +import org.apache.tajo.rpc.test.TestProtos.EchoMessage; +import org.apache.tajo.rpc.test.TestProtos.SumRequest; +import org.apache.tajo.rpc.test.TestProtos.SumResponse; +import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExternalResource; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +public class TestBlockingRpc { + public static final String MESSAGE = "TestBlockingRpc"; + + private BlockingRpcServer server; + private BlockingRpcClient client; + private BlockingInterface stub; + private DummyProtocolBlockingImpl service; + private int retries; + + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @interface SetupRpcConnection { + boolean setupRpcServer() default true; + boolean setupRpcClient() default true; + } + + @Rule + public ExternalResource resource = new ExternalResource() { + + private Description description; + + @Override + public Statement apply(Statement base, Description description) { + this.description = description; + return super.apply(base, description); + } + + @Override + protected void before() throws Throwable { + SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); + + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { + setUpRpcServer(); + } + + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { + setUpRpcClient(); + } + } + + @Override + protected void after() { + SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); + + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { + try { + tearDownRpcClient(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { + try { + tearDownRpcServer(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + } + + }; + + public void setUpRpcServer() throws Exception { + service = new DummyProtocolBlockingImpl(); + server = new BlockingRpcServer(DummyProtocol.class, service, + new InetSocketAddress("127.0.0.1", 0), 2); + server.start(); + } + + public void setUpRpcClient() throws Exception { + retries = 1; + + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey( + RpcUtils.getConnectAddress(server.getListenAddress()), + DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + stub = client.getStub(); + } + + @AfterClass + public static void tearDownClass() throws Exception { + RpcChannelFactory.shutdownGracefully(); + } + + public void tearDownRpcServer() throws Exception { + if(server != null) { + server.shutdown(); + server = null; + } + } + + public void tearDownRpcClient() throws Exception { + if(client != null) { + client.close(); + client = null; + } + } + + @Test + public void testRpc() throws Exception { + SumRequest request = SumRequest.newBuilder() + .setX1(1) + .setX2(2) + .setX3(3.15d) + .setX4(2.0f).build(); + SumResponse response1 = stub.sum(null, request); + assertEquals(8.15d, response1.getResult(), 1e-15); + + EchoMessage message = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + EchoMessage response2 = stub.echo(null, message); + assertEquals(MESSAGE, response2.getMessage()); + } + + @Test + @SetupRpcConnection(setupRpcClient=false) + public void testRpcWithServiceCallable() throws Exception { + RpcConnectionPool pool = RpcConnectionPool.getPool(); + final SumRequest request = SumRequest.newBuilder() + .setX1(1) + .setX2(2) + .setX3(3.15d) + .setX4(2.0f).build(); + + SumResponse response = + new ServerCallable<SumResponse>(pool, + server.getListenAddress(), DummyProtocol.class, false) { + @Override + public SumResponse call(NettyClientBase client) throws Exception { + BlockingInterface stub2 = client.getStub(); + SumResponse response1 = stub2.sum(null, request); + return response1; + } + }.withRetries(); + + assertEquals(8.15d, response.getResult(), 1e-15); + + response = + new ServerCallable<SumResponse>(pool, + server.getListenAddress(), DummyProtocol.class, false) { + @Override + public SumResponse call(NettyClientBase client) throws Exception { + BlockingInterface stub2 = client.getStub(); + SumResponse response1 = stub2.sum(null, request); + return response1; + } + }.withoutRetries(); + + assertTrue(8.15d == response.getResult()); + pool.close(); + } + + @Test + public void testThrowException() throws Exception { + EchoMessage message = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + + try { + stub.throwException(null, message); + fail("RpcCall should throw exception"); + } catch (Throwable t) { + assertTrue(t instanceof TajoServiceException); + assertEquals("Exception Test", t.getMessage()); + TajoServiceException te = (TajoServiceException)t; + assertEquals("org.apache.tajo.rpc.test.DummyProtocol", te.getProtocol()); + assertEquals(server.getListenAddress().getAddress().getHostAddress() + ":" + server.getListenAddress().getPort(), + te.getRemoteAddress()); + } + } + + @Test + @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false) + public void testConnectionRetry() throws Exception { + retries = 10; + ServerSocket serverSocket = new ServerSocket(0); + final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); + serverSocket.close(); + + EchoMessage message = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + + //lazy startup + Thread serverThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2); + } catch (Exception e) { + fail(e.getMessage()); + } + server.start(); + } + }); + serverThread.start(); + + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + stub = client.getStub(); + + EchoMessage response = stub.echo(null, message); + assertEquals(MESSAGE, response.getMessage()); + } + + @Test + public void testConnectionFailed() throws Exception { + NettyClientBase client = null; + + try { + int port = server.getListenAddress().getPort() + 1; + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey( + RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), + DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + client.close(); + } catch (Throwable ce){ + if (client != null) { + client.close(); + } + fail(); + } + } + + @Test + public void testGetNull() throws Exception { + assertNull(stub.getNull(null, null)); + assertTrue(service.getNullCalled); + } + + @Test + public void testShutdown() throws Exception { + final StringBuilder error = new StringBuilder(); + Thread callThread = new Thread() { + public void run() { + try { + EchoMessage message = EchoMessage.newBuilder() + .setMessage(MESSAGE) + .build(); + stub.deley(null, message); + } catch (Exception e) { + error.append(e.getMessage()); + } + synchronized(error) { + error.notifyAll(); + } + } + }; + + callThread.start(); + + final CountDownLatch latch = new CountDownLatch(1); + Thread shutdownThread = new Thread() { + public void run() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + try { + server.shutdown(); + server = null; + latch.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }; + shutdownThread.start(); + + assertTrue(latch.await(5 * 1000, TimeUnit.MILLISECONDS)); + + assertTrue(latch.getCount() == 0); + + synchronized(error) { + error.wait(5 * 1000); + } + + if(!error.toString().isEmpty()) { + fail(error.toString()); + } + } + + @Test + @SetupRpcConnection(setupRpcClient=false) + public void testUnresolvedAddress() throws Exception { + String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey( + RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); + BlockingInterface stub = client.getStub(); + + EchoMessage message = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + EchoMessage response2 = stub.echo(null, message); + assertEquals(MESSAGE, response2.getMessage()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java new file mode 100644 index 0000000..0ca7563 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc.test.impl; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface; +import org.apache.tajo.rpc.test.TestProtos.EchoMessage; +import org.apache.tajo.rpc.test.TestProtos.SumRequest; +import org.apache.tajo.rpc.test.TestProtos.SumResponse; + +public class DummyProtocolAsyncImpl implements Interface { + private static final Log LOG = + LogFactory.getLog(DummyProtocolAsyncImpl.class); + public boolean getNullCalled = false; + public boolean getErrorCalled = false; + + @Override + public void sum(RpcController controller, SumRequest request, + RpcCallback<SumResponse> done) { + + SumResponse response = SumResponse.newBuilder().setResult( + request.getX1()+request.getX2()+request.getX3()+request.getX4() + ).build(); + done.run(response); + } + + @Override + public void echo(RpcController controller, EchoMessage request, + RpcCallback<EchoMessage> done) { + + done.run(request); + } + + @Override + public void getError(RpcController controller, EchoMessage request, + RpcCallback<EchoMessage> done) { + LOG.info("noCallback is called"); + getErrorCalled = true; + controller.setFailed(request.getMessage()); + done.run(request); + } + + @Override + public void getNull(RpcController controller, EchoMessage request, + RpcCallback<EchoMessage> done) { + getNullCalled = true; + LOG.info("noCallback is called"); + done.run(null); + } + + @Override + public void deley(RpcController controller, EchoMessage request, + RpcCallback<EchoMessage> done) { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + LOG.error(e.getMessage()); + } + + done.run(request); + } + + public void throwException(RpcController controller, EchoMessage request, + RpcCallback<EchoMessage> done) { + done.run(request); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java new file mode 100644 index 0000000..8d4b597 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc.test.impl; + +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface; +import org.apache.tajo.rpc.test.TestProtos.EchoMessage; +import org.apache.tajo.rpc.test.TestProtos.SumRequest; +import org.apache.tajo.rpc.test.TestProtos.SumResponse; + +public class DummyProtocolBlockingImpl implements BlockingInterface { + private static final Log LOG = + LogFactory.getLog(DummyProtocolBlockingImpl.class); + public boolean getNullCalled = false; + public boolean getErrorCalled = false; + + @Override + public SumResponse sum(RpcController controller, SumRequest request) + throws ServiceException { + return SumResponse.newBuilder().setResult( + request.getX1()+request.getX2()+request.getX3()+request.getX4() + ).build(); + } + + @Override + public EchoMessage echo(RpcController controller, EchoMessage request) + throws ServiceException { + return EchoMessage.newBuilder(). + setMessage(request.getMessage()).build(); + } + + @Override + public EchoMessage getError(RpcController controller, EchoMessage request) + throws ServiceException { + getErrorCalled = true; + controller.setFailed(request.getMessage()); + return request; + } + + @Override + public EchoMessage getNull(RpcController controller, EchoMessage request) + throws ServiceException { + getNullCalled = true; + LOG.info("noCallback is called"); + return null; + } + + @Override + public EchoMessage deley(RpcController controller, EchoMessage request) + throws ServiceException { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + //throw new ServiceException(e.getMessage(), e); + } + + return request; + } + + public EchoMessage throwException(RpcController controller, EchoMessage request) + throws ServiceException { + throw new ServiceException("Exception Test"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/pom.xml b/tajo-rpc/tajo-ws-rs/pom.xml new file mode 100644 index 0000000..a87a67a --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/pom.xml @@ -0,0 +1,218 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>tajo-project</artifactId> + <version>0.11.0-SNAPSHOT</version> + <groupId>org.apache.tajo</groupId> + <relativePath>../../tajo-project</relativePath> + </parent> + <packaging>jar</packaging> + <artifactId>tajo-ws-rs</artifactId> + <name>Tajo RESTful Container</name> + <description>RESTful Container Implementation based on Netty</description> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <configuration> + </configuration> + <executions> + <execution> + <id>create-jar</id> + <phase>prepare-package</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-rpc-common</artifactId> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-common</artifactId> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-server</artifactId> + </dependency> + <dependency> + <groupId>javax.ws.rs</groupId> + <artifactId>javax.ws.rs-api</artifactId> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <profiles> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>dist</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>dist</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <echo file="${project.build.directory}/dist-layout-stitching.sh"> + run() { + echo "\$ ${@}" + "${@}" + res=$? + if [ $res != 0 ]; then + echo + echo "Failed!" + echo + exit $res + fi + } + + ROOT=`cd ${basedir}/..;pwd` + echo + echo "Current directory `pwd`" + echo + run rm -rf ${project.artifactId}-${project.version} + run mkdir ${project.artifactId}-${project.version} + run cd ${project.artifactId}-${project.version} + run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar . + echo + echo "Tajo RESTful Container dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}" + echo + </echo> + <exec executable="sh" dir="${project.build.directory}" failonerror="true"> + <arg line="./dist-layout-stitching.sh" /> + </exec> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </reporting> + +</project> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java new file mode 100644 index 0000000..a1ea72b --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestChannelInitializer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.stream.ChunkedWriteHandler; + +/** + * Default Channel Initializer for Netty Rest server. + */ +public class NettyRestChannelInitializer extends ChannelInitializer<Channel> { + + private ChannelHandler handler; + + public NettyRestChannelInitializer(ChannelHandler handler) { + this.handler = handler; + } + + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + + pipeline.addLast(new HttpServerCodec()); + pipeline.addLast(new HttpObjectAggregator(1 << 16)); + pipeline.addLast(new ChunkedWriteHandler()); + pipeline.addLast(handler); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java new file mode 100644 index 0000000..81d1eeb --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainer.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.Principal; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import io.netty.buffer.*; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.GenericFutureListener; + +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.SecurityContext; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.glassfish.hk2.api.ServiceLocator; +import org.glassfish.jersey.internal.MapPropertiesDelegate; +import org.glassfish.jersey.server.ApplicationHandler; +import org.glassfish.jersey.server.ContainerException; +import org.glassfish.jersey.server.ContainerRequest; +import org.glassfish.jersey.server.ContainerResponse; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.server.internal.ConfigHelper; +import org.glassfish.jersey.server.spi.Container; +import org.glassfish.jersey.server.spi.ContainerLifecycleListener; +import org.glassfish.jersey.server.spi.ContainerResponseWriter; + +/** + * Jersy Container implementation on Netty + */ +@Sharable +public class NettyRestHandlerContainer extends ChannelDuplexHandler implements Container { + + private static Log LOG = LogFactory.getLog(NettyRestHandlerContainer.class); + + private String rootPath; + + private ApplicationHandler applicationHandler; + private ContainerLifecycleListener lifecycleListener; + + NettyRestHandlerContainer(Application application) { + this(new ApplicationHandler(application)); + } + + NettyRestHandlerContainer(Application application, ServiceLocator parentLocator) { + this(new ApplicationHandler(application, null, parentLocator)); + } + + NettyRestHandlerContainer(ApplicationHandler appHandler) { + applicationHandler = appHandler; + lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler); + } + + @Override + public ResourceConfig getConfiguration() { + return applicationHandler.getConfiguration(); + } + + @Override + public void reload() { + reload(getConfiguration()); + } + + @Override + public void reload(ResourceConfig configuration) { + lifecycleListener.onShutdown(this); + applicationHandler = new ApplicationHandler(configuration); + lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler); + lifecycleListener.onReload(this); + lifecycleListener.onStartup(this); + + if (LOG.isDebugEnabled()) { + LOG.debug("NettyRestHandlerContainer reloaded."); + } + } + + public void setRootPath(String rootPath) { + String tempRootPath = rootPath; + if (tempRootPath == null || tempRootPath.isEmpty()) { + tempRootPath = "/"; + } else if (tempRootPath.charAt(tempRootPath.length() - 1) != '/') { + tempRootPath += "/"; + } + this.rootPath = tempRootPath; + } + + private URI getBaseUri(ChannelHandlerContext ctx, FullHttpRequest request) { + URI baseUri; + String scheme; + + if (ctx.pipeline().get(SslHandler.class) == null) { + scheme = "http"; + } else { + scheme = "https"; + } + + List<String> hosts = request.headers().getAll(HttpHeaders.Names.HOST); + try { + if (hosts != null && hosts.size() > 0) { + baseUri = new URI(scheme + "://" + hosts.get(0) + rootPath); + } else { + InetSocketAddress localAddress = (InetSocketAddress) ctx.channel().localAddress(); + baseUri = new URI(scheme, null, localAddress.getHostName(), localAddress.getPort(), + rootPath, null, null); + } + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + + return baseUri; + } + + protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { + URI baseUri = getBaseUri(ctx, request); + URI requestUri = baseUri.resolve(request.getUri()); + ByteBuf responseContent = PooledByteBufAllocator.DEFAULT.buffer(); + FullHttpResponse response = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, responseContent); + + NettyRestResponseWriter responseWriter = new NettyRestResponseWriter(ctx, response); + ContainerRequest containerRequest = new ContainerRequest(baseUri, requestUri, + request.getMethod().name(), getSecurityContext(), new MapPropertiesDelegate()); + containerRequest.setEntityStream(new ByteBufInputStream(request.content())); + + HttpHeaders httpHeaders = request.headers(); + for (String headerName: httpHeaders.names()) { + List<String> headerValues = httpHeaders.getAll(headerName); + containerRequest.headers(headerName, headerValues); + } + containerRequest.setWriter(responseWriter); + try { + applicationHandler.handle(containerRequest); + } finally { + responseWriter.releaseConnection(); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + boolean needRelease = true; + try { + if (msg instanceof FullHttpRequest) { + FullHttpRequest request = (FullHttpRequest) msg; + messageReceived(ctx, request); + } else { + needRelease = false; + ctx.fireChannelRead(msg); + } + } finally { + if (needRelease) { + ReferenceCountUtil.release(msg); + } + } + } + + private SecurityContext getSecurityContext() { + return new SecurityContext() { + + @Override + public boolean isUserInRole(String role) { + return false; + } + + @Override + public boolean isSecure() { + return false; + } + + @Override + public Principal getUserPrincipal() { + return null; + } + + @Override + public String getAuthenticationScheme() { + return null; + } + }; + } + + /** + * Internal class for writing content on REST service. + */ + static class NettyRestResponseWriter implements ContainerResponseWriter { + + private final ChannelHandlerContext ctx; + private final FullHttpResponse response; + private final AtomicBoolean closed; + + public NettyRestResponseWriter(ChannelHandlerContext ctx, FullHttpResponse response) { + this.ctx = ctx; + this.response = response; + this.closed = new AtomicBoolean(false); + } + + @Override + public void commit() { + if (closed.compareAndSet(false, true)) { + ctx.write(response); + sendLastHttpContent(); + } + } + + @Override + public boolean enableResponseBuffering() { + return false; + } + + @Override + public void failure(Throwable error) { + try { + sendError(HttpResponseStatus.INTERNAL_SERVER_ERROR, error); + } finally { + if (ctx.channel().isActive()) { + ctx.close(); + } + } + } + + private void sendError(HttpResponseStatus status, final Throwable error) { + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, + Unpooled.copiedBuffer(error.getMessage(), CharsetUtil.UTF_8)); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); + ChannelPromise promise = ctx.newPromise(); + promise.addListener(new GenericFutureListener<ChannelFuture>() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + throw new ContainerException(error); + } + } + }); + + ctx.writeAndFlush(response, promise); + } + + @Override + public void setSuspendTimeout(long timeOut, TimeUnit timeUnit) throws IllegalStateException { + throw new UnsupportedOperationException("setSuspendTimeout is not supported on this container."); + } + + @Override + public boolean suspend(long timeOut, TimeUnit timeUnit, TimeoutHandler timeoutHandler) { + throw new UnsupportedOperationException("suspend is not supported on this container."); + } + + @Override + public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse context) + throws ContainerException { + MultivaluedMap<String, String> responseHeaders = context.getStringHeaders(); + HttpHeaders nettyHeaders = response.headers(); + + for (Entry<String, List<String>> entry: responseHeaders.entrySet()) { + nettyHeaders.add(entry.getKey(), entry.getValue()); + } + + int status = context.getStatus(); + + response.setStatus(HttpResponseStatus.valueOf(status)); + return new ByteBufOutputStream(response.content()); + } + + private void sendLastHttpContent() { + ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT) + .addListener(ChannelFutureListener.CLOSE); + } + + private void releaseConnection() { + if (closed.compareAndSet(false, true)) { + String warnMessage = "ResponseWriter did not be commited."; + LOG.warn(warnMessage); + failure(new IllegalStateException(warnMessage)); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java new file mode 100644 index 0000000..7481cfb --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestHandlerContainerProvider.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import io.netty.channel.ChannelHandler; + +import javax.ws.rs.ProcessingException; + +import org.glassfish.jersey.server.ApplicationHandler; +import org.glassfish.jersey.server.spi.ContainerProvider; + +/** + * Container Provider for NettyRestHandlerContainer + */ +public final class NettyRestHandlerContainerProvider implements ContainerProvider { + + @Override + public <T> T createContainer(Class<T> type, ApplicationHandler application) throws ProcessingException { + if (type != NettyRestHandlerContainer.class && + (type == null || !ChannelHandler.class.isAssignableFrom(type))) { + return null; + } + return type.cast(new NettyRestHandlerContainer(application)); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java new file mode 100644 index 0000000..f7fe148 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServer.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import io.netty.channel.ChannelHandler; +import java.net.InetSocketAddress; + +import org.apache.tajo.rpc.NettyServerBase; + +/** + * JAX-RS Http Server on Netty implementation. + */ +public class NettyRestServer extends NettyServerBase { + + private ChannelHandler handler; + private int workerCount; + + public NettyRestServer(InetSocketAddress address, int workerCount) { + this("NettyRestService", address, workerCount); + } + + public NettyRestServer(String serviceName, InetSocketAddress address, int workerCount) { + super(serviceName, address); + + this.workerCount = workerCount; + } + + public ChannelHandler getHandler() { + return handler; + } + + public void setHandler(ChannelHandler handler) { + this.handler = handler; + } + + /** + * Bind desired port and start network service. Before starting network service, {@link NettyRestServer} + * will initialize its configuration. + * + */ + @Override + public void start() { + if (handler == null) { + throw new IllegalStateException("ChannelHandler is null."); + } + + super.init(new NettyRestChannelInitializer(handler), workerCount); + super.start(); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java new file mode 100644 index 0000000..5d2eea1 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerFactory.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import java.net.InetSocketAddress; +import java.net.URI; + +import org.glassfish.hk2.api.ServiceLocator; +import org.glassfish.jersey.server.ResourceConfig; + +/** + * Factory class for creating {@link NettyRestServer} instances + */ +public final class NettyRestServerFactory { + + public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration, int workerCount) { + return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration), workerCount, true); + } + + public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration, int workerCount, + boolean start) { + return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration), workerCount, start); + } + + public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration, + ServiceLocator parentLocator, int workerCount) { + return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration, parentLocator), workerCount, true); + } + + public static NettyRestServer createNettyRestServer(URI uri, ResourceConfig configuration, + ServiceLocator parentLocator, int workerCount, boolean start) { + return createNettyRestServer(uri, new NettyRestHandlerContainer(configuration, parentLocator), workerCount, start); + } + + /** + * Creates {@link NettyRestServer} instances with JAX-RS application. + * + * @param uri + * @param handler + * @param start + * @return + */ + private static NettyRestServer createNettyRestServer(URI uri, NettyRestHandlerContainer handler, int workerCount, + boolean start) { + if (uri == null) { + throw new IllegalArgumentException("uri is null."); + } + + String schemeString = uri.getScheme(); + if (!schemeString.equalsIgnoreCase("http") && !schemeString.equalsIgnoreCase("https")) { + throw new IllegalArgumentException("scheme of this uri (" + uri.toString() + ") should be http or https."); + } + + int port = uri.getPort(); + if (port == -1) { + throw new IllegalArgumentException("Port number should be provided."); + } + + handler.setRootPath(uri.getPath()); + + InetSocketAddress bindAddress = new InetSocketAddress(port); + NettyRestServer nettyRestServer = new NettyRestServer("Tajo-REST", bindAddress, workerCount); + + nettyRestServer.setHandler(handler); + nettyRestServer.addListener(new NettyRestServerListener(handler)); + + if (start) { + nettyRestServer.start(); + } + + return nettyRestServer; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java new file mode 100644 index 0000000..ecd5bb0 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/NettyRestServerListener.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty; + +import org.apache.tajo.rpc.RpcEventListener; +import org.glassfish.jersey.server.ApplicationHandler; +import org.glassfish.jersey.server.internal.ConfigHelper; +import org.glassfish.jersey.server.spi.Container; +import org.glassfish.jersey.server.spi.ContainerLifecycleListener; + +/** + * Event subscriber for netty rest service. + */ +public class NettyRestServerListener implements RpcEventListener { + + private Container container; + + public NettyRestServerListener(Container container) { + this.container = container; + } + + @Override + public void onAfterInit(Object obj) { + + } + + @Override + public void onAfterShutdown(Object obj) { + ApplicationHandler applicationHandler = new ApplicationHandler(container.getConfiguration()); + ContainerLifecycleListener lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler); + lifecycleListener.onShutdown(container); + } + + @Override + public void onAfterStart(Object obj) { + ApplicationHandler applicationHandler = new ApplicationHandler(container.getConfiguration()); + ContainerLifecycleListener lifecycleListener = ConfigHelper.getContainerLifecycleListener(applicationHandler); + lifecycleListener.onStartup(container); + } + + @Override + public void onBeforeInit(Object obj) { + + } + + @Override + public void onBeforeShutdown(Object obj) { + + } + + @Override + public void onBeforeStart(Object obj) { + + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java new file mode 100644 index 0000000..26086d4 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonFeature.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.gson; + +import javax.ws.rs.core.Feature; +import javax.ws.rs.core.FeatureContext; +import javax.ws.rs.ext.MessageBodyReader; +import javax.ws.rs.ext.MessageBodyWriter; + +public class GsonFeature implements Feature { + + @Override + public boolean configure(FeatureContext featureContext) { + featureContext.register(GsonReader.class, MessageBodyReader.class); + featureContext.register(GsonWriter.class, MessageBodyWriter.class); + return true; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java new file mode 100644 index 0000000..4d6e440 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonReader.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.gson; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import javax.ws.rs.Consumes; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyReader; +import java.io.*; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +/** + * Custom message body reader with Gson feature. + */ +@Consumes(MediaType.APPLICATION_JSON) +public class GsonReader<T> implements MessageBodyReader<T> { + + @Override + public boolean isReadable(Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType) { + return GsonUtil.isJsonType(mediaType); + } + + @Override + public T readFrom(Class<T> aClass, Type type, Annotation[] annotations, MediaType mediaType, + MultivaluedMap<String, String> multivaluedMap, InputStream inputStream) + throws IOException, WebApplicationException { + Gson gson = new GsonBuilder().create(); + Reader reader = new BufferedReader(new InputStreamReader(inputStream)); + return gson.fromJson(reader, type); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java new file mode 100644 index 0000000..f16cb96 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonUtil.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.gson; + +import javax.ws.rs.core.MediaType; + +public class GsonUtil { + + public static boolean isJsonType(MediaType mediaType) { + if (mediaType != null) { + String subType = mediaType.getSubtype(); + return "json".equalsIgnoreCase(subType) || subType.endsWith("+json"); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java new file mode 100644 index 0000000..d215611 --- /dev/null +++ b/tajo-rpc/tajo-ws-rs/src/main/java/org/apache/tajo/ws/rs/netty/gson/GsonWriter.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ws.rs.netty.gson; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.ext.MessageBodyWriter; +import java.io.*; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; + +/** + * custom message body writer with Gson feature. + */ +@Produces(MediaType.APPLICATION_JSON) +public class GsonWriter<T> implements MessageBodyWriter<T> { + + @Override + public boolean isWriteable(Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType) { + return GsonUtil.isJsonType(mediaType); + } + + @Override + public long getSize(T t, Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType) { + return 0; + } + + @Override + public void writeTo(T t, Class<?> aClass, Type type, Annotation[] annotations, MediaType mediaType, + MultivaluedMap<String, Object> multivaluedMap, OutputStream outputStream) + throws IOException, WebApplicationException { + Gson gson = new GsonBuilder().create(); + Writer writer = new BufferedWriter(new OutputStreamWriter(outputStream)); + + gson.toJson(t, type, writer); + writer.flush(); + } +}
