TAJO-1337: Implements common modules to handle RESTful API Closes #399
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/a9ae3cab Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/a9ae3cab Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/a9ae3cab Branch: refs/heads/master Commit: a9ae3cab69526294475a771014e9c0e49c80462b Parents: 82d44af Author: Jihun Kang <[email protected]> Authored: Wed Mar 18 23:57:06 2015 +0900 Committer: Jihun Kang <[email protected]> Committed: Wed Mar 18 23:57:06 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + tajo-catalog/tajo-catalog-client/pom.xml | 2 +- .../tajo-catalog-drivers/tajo-hcatalog/pom.xml | 2 +- tajo-catalog/tajo-catalog-server/pom.xml | 2 +- tajo-cli/pom.xml | 2 +- tajo-client/pom.xml | 2 +- .../java/org/apache/tajo/conf/TajoConf.java | 2 + tajo-core/pom.xml | 2 +- tajo-dist/pom.xml | 7 +- tajo-project/pom.xml | 31 ++ tajo-pullserver/pom.xml | 2 +- tajo-rpc/pom.xml | 180 ++-------- .../org/apache/tajo/rpc/AsyncRpcClient.java | 227 ------------ .../org/apache/tajo/rpc/AsyncRpcServer.java | 148 -------- .../org/apache/tajo/rpc/BlockingRpcClient.java | 273 --------------- .../org/apache/tajo/rpc/BlockingRpcServer.java | 147 -------- .../java/org/apache/tajo/rpc/CallFuture.java | 84 ----- .../apache/tajo/rpc/DefaultRpcController.java | 65 ---- .../org/apache/tajo/rpc/NettyClientBase.java | 221 ------------ .../org/apache/tajo/rpc/NettyRpcController.java | 63 ---- .../org/apache/tajo/rpc/NettyServerBase.java | 205 ----------- .../java/org/apache/tajo/rpc/NullCallback.java | 38 -- .../tajo/rpc/ProtoChannelInitializer.java | 50 --- .../apache/tajo/rpc/RemoteCallException.java | 69 ---- .../org/apache/tajo/rpc/RemoteException.java | 37 -- .../tajo/rpc/RetriesExhaustedException.java | 104 ------ .../org/apache/tajo/rpc/RpcChannelFactory.java | 182 ---------- .../org/apache/tajo/rpc/RpcConnectionPool.java | 190 ---------- .../main/java/org/apache/tajo/rpc/RpcUtils.java | 122 ------- .../org/apache/tajo/rpc/ServerCallable.java | 162 --------- .../apache/tajo/rpc/TajoServiceException.java | 58 --- tajo-rpc/src/main/proto/DummyProtos.proto | 47 --- tajo-rpc/src/main/proto/RpcProtos.proto | 32 -- tajo-rpc/src/main/proto/TestProtocol.proto | 31 -- tajo-rpc/src/main/proto/TestProtos.proto | 35 -- tajo-rpc/src/test/java/log4j.properties | 25 -- .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 345 ------------------ .../org/apache/tajo/rpc/TestBlockingRpc.java | 349 ------------------- .../rpc/test/impl/DummyProtocolAsyncImpl.java | 86 ----- .../test/impl/DummyProtocolBlockingImpl.java | 83 ----- tajo-rpc/tajo-rpc-common/pom.xml | 216 ++++++++++++ .../org/apache/tajo/rpc/NettyServerBase.java | 243 +++++++++++++ .../org/apache/tajo/rpc/RemoteException.java | 37 ++ .../tajo/rpc/RetriesExhaustedException.java | 104 ++++++ .../org/apache/tajo/rpc/RpcChannelFactory.java | 182 ++++++++++ .../org/apache/tajo/rpc/RpcEventListener.java | 62 ++++ .../main/java/org/apache/tajo/rpc/RpcUtils.java | 122 +++++++ tajo-rpc/tajo-rpc-protobuf/pom.xml | 274 +++++++++++++++ .../org/apache/tajo/rpc/AsyncRpcClient.java | 227 ++++++++++++ .../org/apache/tajo/rpc/AsyncRpcServer.java | 148 ++++++++ .../org/apache/tajo/rpc/BlockingRpcClient.java | 273 +++++++++++++++ .../org/apache/tajo/rpc/BlockingRpcServer.java | 147 ++++++++ .../java/org/apache/tajo/rpc/CallFuture.java | 84 +++++ .../apache/tajo/rpc/DefaultRpcController.java | 65 ++++ .../org/apache/tajo/rpc/NettyClientBase.java | 221 ++++++++++++ .../org/apache/tajo/rpc/NettyRpcController.java | 63 ++++ .../java/org/apache/tajo/rpc/NullCallback.java | 38 ++ .../tajo/rpc/ProtoChannelInitializer.java | 50 +++ .../apache/tajo/rpc/RemoteCallException.java | 69 ++++ .../org/apache/tajo/rpc/RemoteException.java | 37 ++ .../tajo/rpc/RetriesExhaustedException.java | 104 ++++++ .../org/apache/tajo/rpc/RpcConnectionPool.java | 190 ++++++++++ .../org/apache/tajo/rpc/ServerCallable.java | 162 +++++++++ .../apache/tajo/rpc/TajoServiceException.java | 58 +++ .../src/main/proto/DummyProtos.proto | 47 +++ .../src/main/proto/RpcProtos.proto | 32 ++ .../src/main/proto/TestProtocol.proto | 31 ++ .../src/main/proto/TestProtos.proto | 35 ++ .../src/test/java/log4j.properties | 25 ++ .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 345 ++++++++++++++++++ .../org/apache/tajo/rpc/TestBlockingRpc.java | 349 +++++++++++++++++++ .../rpc/test/impl/DummyProtocolAsyncImpl.java | 86 +++++ .../test/impl/DummyProtocolBlockingImpl.java | 83 +++++ tajo-rpc/tajo-ws-rs/pom.xml | 218 ++++++++++++ .../rs/netty/NettyRestChannelInitializer.java | 50 +++ .../ws/rs/netty/NettyRestHandlerContainer.java | 319 +++++++++++++++++ .../NettyRestHandlerContainerProvider.java | 42 +++ .../tajo/ws/rs/netty/NettyRestServer.java | 67 ++++ .../ws/rs/netty/NettyRestServerFactory.java | 89 +++++ .../ws/rs/netty/NettyRestServerListener.java | 72 ++++ .../tajo/ws/rs/netty/gson/GsonFeature.java | 34 ++ .../tajo/ws/rs/netty/gson/GsonReader.java | 52 +++ .../apache/tajo/ws/rs/netty/gson/GsonUtil.java | 32 ++ .../tajo/ws/rs/netty/gson/GsonWriter.java | 59 ++++ .../NettyRestHandlerContainerProviderTest.java | 66 ++++ .../tajo/ws/rs/netty/NettyRestServerTest.java | 137 ++++++++ .../ws/rs/netty/testapp1/TestApplication1.java | 38 ++ .../ws/rs/netty/testapp1/TestResource1.java | 36 ++ .../ws/rs/netty/testapp2/DirectoriesDao.java | 39 +++ .../rs/netty/testapp2/DirectoriesResource.java | 85 +++++ .../tajo/ws/rs/netty/testapp2/Directory.java | 52 +++ .../testapp2/FileManagementApplication.java | 35 ++ 92 files changed, 5809 insertions(+), 3636 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index c3f2691..4875cab 100644 --- a/CHANGES +++ b/CHANGES @@ -76,6 +76,8 @@ Release 0.11.0 - unreleased SUB TASKS + TAJO-1337: Implements common modules to handle RESTful API. (jihun) + TAJO-1329: Improve Schema class to support nested struct support. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-catalog/tajo-catalog-client/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/pom.xml b/tajo-catalog/tajo-catalog-client/pom.xml index 98b85a8..84e2aa3 100644 --- a/tajo-catalog/tajo-catalog-client/pom.xml +++ b/tajo-catalog/tajo-catalog-client/pom.xml @@ -135,7 +135,7 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-rpc</artifactId> + <artifactId>tajo-rpc-protobuf</artifactId> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml index fe8f34a..7c3efdd 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml @@ -109,7 +109,7 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-rpc</artifactId> + <artifactId>tajo-rpc-protobuf</artifactId> </dependency> <dependency> <groupId>org.apache.tajo</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-catalog/tajo-catalog-server/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-server/pom.xml b/tajo-catalog/tajo-catalog-server/pom.xml index 501f9af..8efeecf 100644 --- a/tajo-catalog/tajo-catalog-server/pom.xml +++ b/tajo-catalog/tajo-catalog-server/pom.xml @@ -141,7 +141,7 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-rpc</artifactId> + <artifactId>tajo-rpc-protobuf</artifactId> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-cli/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-cli/pom.xml b/tajo-cli/pom.xml index 684c298..e8360ad 100644 --- a/tajo-cli/pom.xml +++ b/tajo-cli/pom.xml @@ -140,7 +140,7 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-rpc</artifactId> + <artifactId>tajo-rpc-protobuf</artifactId> </dependency> <dependency> <groupId>commons-cli</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-client/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-client/pom.xml b/tajo-client/pom.xml index 692e1b5..e6be476 100644 --- a/tajo-client/pom.xml +++ b/tajo-client/pom.xml @@ -195,7 +195,7 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-rpc</artifactId> + <artifactId>tajo-rpc-protobuf</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 4ed8097..5b569d5 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -249,6 +249,8 @@ public class TajoConf extends Configuration { Runtime.getRuntime().availableProcessors() * 1), WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.worker.service.rpc.server.worker-thread-num", Runtime.getRuntime().availableProcessors() * 1), + REST_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.rest.service.rpc.server.worker-thread-num", + Runtime.getRuntime().availableProcessors() * 1), // Task Configuration ----------------------------------------------------- TASK_DEFAULT_MEMORY("tajo.task.memory-slot-mb.default", 512), http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-core/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index 38bddec..61a156b 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -277,7 +277,7 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-rpc</artifactId> + <artifactId>tajo-rpc-protobuf</artifactId> </dependency> <dependency> <groupId>org.apache.tajo</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-dist/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index aed7b4b..da5f48f 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -60,7 +60,12 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-rpc</artifactId> + <artifactId>tajo-rpc-protobuf</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-ws-rs</artifactId> <scope>provided</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-project/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index 9f1b1ab..37121e3 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -38,6 +38,7 @@ <tajo.version>0.11.0-SNAPSHOT</tajo.version> <hbase.version>0.98.7-hadoop2</hbase.version> <netty.version>4.0.25.Final</netty.version> + <jersey.version>2.6</jersey.version> <tajo.root>${project.parent.relativePath}/..</tajo.root> <extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path> </properties> @@ -789,6 +790,21 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> + <artifactId>tajo-rpc-common</artifactId> + <version>${tajo.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-rpc-protobuf</artifactId> + <version>${tajo.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-ws-rs</artifactId> + <version>${tajo.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> <artifactId>tajo-algebra</artifactId> <version>${tajo.version}</version> </dependency> @@ -1063,6 +1079,21 @@ <artifactId>jcip-annotations</artifactId> <version>1.0-1</version> </dependency> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-common</artifactId> + <version>${jersey.version}</version> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-server</artifactId> + <version>${jersey.version}</version> + </dependency> + <dependency> + <groupId>javax.ws.rs</groupId> + <artifactId>javax.ws.rs-api</artifactId> + <version>2.0.1</version> + </dependency> </dependencies> </dependencyManagement> <profiles> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-pullserver/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml index 944cf3d..ba6e6b7 100644 --- a/tajo-pullserver/pom.xml +++ b/tajo-pullserver/pom.xml @@ -56,7 +56,7 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-rpc</artifactId> + <artifactId>tajo-rpc-protobuf</artifactId> </dependency> <dependency> <groupId>org.apache.tajo</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml index 8c626b4..f069aca 100644 --- a/tajo-rpc/pom.xml +++ b/tajo-rpc/pom.xml @@ -24,165 +24,39 @@ <groupId>org.apache.tajo</groupId> <relativePath>../tajo-project</relativePath> </parent> - <packaging>jar</packaging> <artifactId>tajo-rpc</artifactId> - <name>Tajo Rpc</name> - <description>RPC Server/Client Implementation based on Netty and Protocol Buffer</description> + <packaging>pom</packaging> + <name>Tajo RPC</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + + <modules> + <module>tajo-rpc-common</module> + <module>tajo-rpc-protobuf</module> + <module>tajo-ws-rs</module> + </modules> <build> <plugins> <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.6</source> - <target>1.6</target> - <encoding>${project.build.sourceEncoding}</encoding> - </configuration> - </plugin> - <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> - <executions> - <execution> - <phase>verify</phase> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>2.4</version> - <configuration> - </configuration> - <executions> - <execution> - <id>create-jar</id> - <phase>prepare-package</phase> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <id>create-protobuf-generated-sources-directory</id> - <phase>initialize</phase> - <configuration> - <target> - <mkdir dir="target/generated-sources/proto" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>exec-maven-plugin</artifactId> - <version>1.2</version> - <executions> - <execution> - <id>generate-sources</id> - <phase>generate-sources</phase> - <configuration> - <executable>protoc</executable> - <arguments> - <argument>-Isrc/main/proto/</argument> - <argument>--java_out=target/generated-sources/proto</argument> - <argument>src/main/proto/DummyProtos.proto</argument> - <argument>src/main/proto/RpcProtos.proto</argument> - <argument>src/main/proto/TestProtos.proto</argument> - <argument>src/main/proto/TestProtocol.proto</argument> - </arguments> - </configuration> - <goals> - <goal>exec</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.5</version> - <executions> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>target/generated-sources/proto</source> - </sources> - </configuration> - </execution> - </executions> + <artifactId>maven-surefire-report-plugin</artifactId> </plugin> <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-report-plugin</artifactId> - <version>2.15</version> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> </plugin> </plugins> </build> - <dependencies> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-transport</artifactId> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-codec</artifactId> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-handler</artifactId> - </dependency> - <dependency> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - </properties> - <repositories> - <repository> - <id>repository.jboss.org</id> - <url>https://repository.jboss.org/nexus/content/repositories/releases/ - </url> - <snapshots> - <enabled>false</enabled> - </snapshots> - </repository> - </repositories> <profiles> <profile> @@ -216,6 +90,9 @@ <id>dist</id> <activation> <activeByDefault>false</activeByDefault> + <property> + <name>tar|rpm|deb</name> + </property> </activation> <build> <plugins> @@ -225,7 +102,7 @@ <executions> <execution> <id>dist</id> - <phase>package</phase> + <phase>prepare-package</phase> <goals> <goal>run</goal> </goals> @@ -248,12 +125,15 @@ echo echo "Current directory `pwd`" echo - run rm -rf ${project.artifactId}-${project.version} - run mkdir ${project.artifactId}-${project.version} - run cd ${project.artifactId}-${project.version} - run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar . + run rm -rf tajo-rpc-${project.version} + run mkdir tajo-rpc-${project.version} + run cd tajo-rpc-${project.version} + run cp -r ${basedir}/tajo-rpc-common/target/tajo-rpc-common-${project.version}*.jar . + run cp -r ${basedir}/tajo-rpc-protobuf/target/tajo-rpc-protobuf-${project.version}*.jar . + run cp -r ${basedir}/tajo-ws-rs/target/tajo-ws-rs-${project.version}*.jar . + echo - echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}" + echo "Tajo RPC dist layout available at: ${project.build.directory}/tajo-rpc-${project.version}" echo </echo> <exec executable="sh" dir="${project.build.directory}" failonerror="true"> @@ -274,9 +154,9 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-report-plugin</artifactId> - <version>2.15</version> </plugin> </plugins> </reporting> </project> + http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java deleted file mode 100644 index 3d856ce..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.*; - -import io.netty.channel.*; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; -import org.apache.tajo.rpc.RpcProtos.RpcRequest; -import org.apache.tajo.rpc.RpcProtos.RpcResponse; - -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.GenericFutureListener; - -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -public class AsyncRpcClient extends NettyClientBase { - private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class); - - private final ConcurrentMap<Integer, ResponseCallback> requests = - new ConcurrentHashMap<Integer, ResponseCallback>(); - - private final Method stubMethod; - private final ProxyRpcChannel rpcChannel; - private final ClientChannelInboundHandler inboundHandler; - - /** - * Intentionally make this method package-private, avoiding user directly - * new an instance through this constructor. - */ - AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries) - throws ClassNotFoundException, NoSuchMethodException { - super(rpcConnectionKey, retries); - stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class); - rpcChannel = new ProxyRpcChannel(); - inboundHandler = new ClientChannelInboundHandler(); - init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance())); - } - - @Override - public <T> T getStub() { - return getStub(stubMethod, rpcChannel); - } - - protected void sendExceptions(String message) { - for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) { - ResponseCallback callback = callbackEntry.getValue(); - Integer id = callbackEntry.getKey(); - - RpcResponse.Builder responseBuilder = RpcResponse.newBuilder() - .setErrorMessage(message) - .setId(id); - - callback.run(responseBuilder.build()); - } - } - - @Override - public void close() { - sendExceptions("AsyncRpcClient terminates all the connections"); - - super.close(); - } - - private class ProxyRpcChannel implements RpcChannel { - - public void callMethod(final MethodDescriptor method, - final RpcController controller, - final Message param, - final Message responseType, - RpcCallback<Message> done) { - - int nextSeqId = sequence.getAndIncrement(); - - Message rpcRequest = buildRequest(nextSeqId, method, param); - - inboundHandler.registerCallback(nextSeqId, - new ResponseCallback(controller, responseType, done)); - - ChannelPromise channelPromise = getChannel().newPromise(); - channelPromise.addListener(new GenericFutureListener<ChannelFuture>() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - inboundHandler.exceptionCaught(null, new ServiceException(future.cause())); - } - } - }); - getChannel().writeAndFlush(rpcRequest, channelPromise); - } - - private Message buildRequest(int seqId, - MethodDescriptor method, - Message param) { - - RpcRequest.Builder requestBuilder = RpcRequest.newBuilder() - .setId(seqId) - .setMethodName(method.getName()); - - if (param != null) { - requestBuilder.setRequestMessage(param.toByteString()); - } - - return requestBuilder.build(); - } - } - - private class ResponseCallback implements RpcCallback<RpcResponse> { - private final RpcController controller; - private final Message responsePrototype; - private final RpcCallback<Message> callback; - - public ResponseCallback(RpcController controller, - Message responsePrototype, - RpcCallback<Message> callback) { - this.controller = controller; - this.responsePrototype = responsePrototype; - this.callback = callback; - } - - @Override - public void run(RpcResponse rpcResponse) { - // if hasErrorMessage is true, it means rpc-level errors. - // it does not call the callback function\ - if (rpcResponse.hasErrorMessage()) { - if (controller != null) { - this.controller.setFailed(rpcResponse.getErrorMessage()); - } - callback.run(null); - } else { // if rpc call succeed - try { - Message responseMessage; - if (!rpcResponse.hasResponseMessage()) { - responseMessage = null; - } else { - responseMessage = responsePrototype.newBuilderForType().mergeFrom( - rpcResponse.getResponseMessage()).build(); - } - - callback.run(responseMessage); - - } catch (InvalidProtocolBufferException e) { - throw new RemoteException(getErrorMessage(""), e); - } - } - } - } - - private String getErrorMessage(String message) { - return "Exception [" + protocol.getCanonicalName() + - "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) - getChannel().remoteAddress()) + ")]: " + message; - } - - @ChannelHandler.Sharable - private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { - - void registerCallback(int seqId, ResponseCallback callback) { - - if (requests.putIfAbsent(seqId, callback) != null) { - throw new RemoteException( - getErrorMessage("Duplicate Sequence Id "+ seqId)); - } - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - if (msg instanceof RpcResponse) { - try { - RpcResponse response = (RpcResponse) msg; - ResponseCallback callback = requests.remove(response.getId()); - - if (callback == null) { - LOG.warn("Dangling rpc call"); - } else { - callback.run(response); - } - } finally { - ReferenceCountUtil.release(msg); - } - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause); - - sendExceptions(cause.getMessage()); - - if(LOG.isDebugEnabled()) { - LOG.error(cause.getMessage(), cause); - } else { - LOG.error("RPC Exception:" + cause.getMessage()); - } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java deleted file mode 100644 index 3b5a747..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.*; -import com.google.protobuf.Descriptors.MethodDescriptor; - -import io.netty.channel.*; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.RpcProtos.RpcRequest; -import org.apache.tajo.rpc.RpcProtos.RpcResponse; - -import io.netty.util.ReferenceCountUtil; - -import java.lang.reflect.Method; -import java.net.InetSocketAddress; - -public class AsyncRpcServer extends NettyServerBase { - private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class); - - private final Service service; - private final ChannelInitializer<Channel> initializer; - - public AsyncRpcServer(final Class<?> protocol, - final Object instance, - final InetSocketAddress bindAddress, - final int workerNum) - throws Exception { - super(protocol.getSimpleName(), bindAddress); - - String serviceClassName = protocol.getName() + "$" + - protocol.getSimpleName() + "Service"; - Class<?> serviceClass = Class.forName(serviceClassName); - Class<?> interfaceClass = Class.forName(serviceClassName + "$Interface"); - Method method = serviceClass.getMethod("newReflectiveService", interfaceClass); - this.service = (Service) method.invoke(null, instance); - - this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); - super.init(this.initializer, workerNum); - } - - @ChannelHandler.Sharable - private class ServerHandler extends ChannelInboundHandlerAdapter { - - @Override - public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - accepted.add(ctx.channel()); - if(LOG.isDebugEnabled()){ - LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size())); - } - super.channelRegistered(ctx); - } - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - accepted.remove(ctx.channel()); - if (LOG.isDebugEnabled()) { - LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size()); - } - super.channelUnregistered(ctx); - } - - @Override - public void channelRead(final ChannelHandlerContext ctx, Object msg) - throws Exception { - if (msg instanceof RpcRequest) { - try { - final RpcRequest request = (RpcRequest) msg; - - String methodName = request.getMethodName(); - MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); - - if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); - } - - Message paramProto = null; - if (request.hasRequestMessage()) { - try { - paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() - .mergeFrom(request.getRequestMessage()).build(); - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - } - - final RpcController controller = new NettyRpcController(); - - RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() { - - public void run(Message returnValue) { - - RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); - - if (returnValue != null) { - builder.setResponseMessage(returnValue.toByteString()); - } - - if (controller.failed()) { - builder.setErrorMessage(controller.errorText()); - } - - ctx.writeAndFlush(builder.build()); - } - }; - - service.callMethod(methodDescriptor, controller, paramProto, callback); - - } finally { - ReferenceCountUtil.release(msg); - } - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception{ - if (cause instanceof RemoteCallException) { - RemoteCallException callException = (RemoteCallException) cause; - ctx.writeAndFlush(callException.getResponse()); - } else { - LOG.error(cause.getMessage()); - } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); - } - } - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java deleted file mode 100644 index 6a90330..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java +++ /dev/null @@ -1,273 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.*; -import com.google.protobuf.Descriptors.MethodDescriptor; - -import io.netty.channel.*; -import io.netty.util.concurrent.*; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; -import org.apache.tajo.rpc.RpcProtos.RpcRequest; -import org.apache.tajo.rpc.RpcProtos.RpcResponse; - -import io.netty.util.ReferenceCountUtil; - -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.*; -import java.util.concurrent.Future; - -public class BlockingRpcClient extends NettyClientBase { - private static final Log LOG = LogFactory.getLog(RpcProtos.class); - - private final Map<Integer, ProtoCallFuture> requests = - new ConcurrentHashMap<Integer, ProtoCallFuture>(); - - private final Method stubMethod; - private final ProxyRpcChannel rpcChannel; - private final ChannelInboundHandlerAdapter inboundHandler; - - /** - * Intentionally make this method package-private, avoiding user directly - * new an instance through this constructor. - */ - BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries) - throws ClassNotFoundException, NoSuchMethodException { - super(rpcConnectionKey, retries); - stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class); - rpcChannel = new ProxyRpcChannel(); - inboundHandler = new ClientChannelInboundHandler(); - init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance())); - } - - @Override - public <T> T getStub() { - return getStub(stubMethod, rpcChannel); - } - - @Override - public void close() { - for(ProtoCallFuture callback: requests.values()) { - callback.setFailed("BlockingRpcClient terminates all the connections", - new ServiceException("BlockingRpcClient terminates all the connections")); - } - - super.close(); - } - - private class ProxyRpcChannel implements BlockingRpcChannel { - - @Override - public Message callBlockingMethod(final MethodDescriptor method, - final RpcController controller, - final Message param, - final Message responsePrototype) - throws TajoServiceException { - - int nextSeqId = sequence.getAndIncrement(); - - Message rpcRequest = buildRequest(nextSeqId, method, param); - - ProtoCallFuture callFuture = - new ProtoCallFuture(controller, responsePrototype); - requests.put(nextSeqId, callFuture); - - ChannelPromise channelPromise = getChannel().newPromise(); - channelPromise.addListener(new GenericFutureListener<ChannelFuture>() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - inboundHandler.exceptionCaught(null, new ServiceException(future.cause())); - } - } - }); - getChannel().writeAndFlush(rpcRequest, channelPromise); - - try { - return callFuture.get(60, TimeUnit.SECONDS); - } catch (Throwable t) { - if (t instanceof ExecutionException) { - Throwable cause = t.getCause(); - if (cause != null && cause instanceof TajoServiceException) { - throw (TajoServiceException)cause; - } - } - throw new TajoServiceException(t.getMessage()); - } - } - - private Message buildRequest(int seqId, - MethodDescriptor method, - Message param) { - RpcRequest.Builder requestBuilder = RpcRequest.newBuilder() - .setId(seqId) - .setMethodName(method.getName()); - - if (param != null) { - requestBuilder.setRequestMessage(param.toByteString()); - } - - return requestBuilder.build(); - } - } - - private String getErrorMessage(String message) { - if(getChannel() != null) { - return protocol.getName() + - "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) - getChannel().remoteAddress()) + "): " + message; - } else { - return "Exception " + message; - } - } - - private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) { - if(getChannel() != null) { - return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(), - RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress())); - } else { - return new TajoServiceException(response.getErrorMessage()); - } - } - - @ChannelHandler.Sharable - private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - - if (msg instanceof RpcResponse) { - try { - RpcResponse rpcResponse = (RpcResponse) msg; - ProtoCallFuture callback = requests.remove(rpcResponse.getId()); - - if (callback == null) { - LOG.warn("Dangling rpc call"); - } else { - if (rpcResponse.hasErrorMessage()) { - callback.setFailed(rpcResponse.getErrorMessage(), - makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace()))); - throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage())); - } else { - Message responseMessage; - - if (!rpcResponse.hasResponseMessage()) { - responseMessage = null; - } else { - responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage()) - .build(); - } - - callback.setResponse(responseMessage); - } - } - } finally { - ReferenceCountUtil.release(msg); - } - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - for(ProtoCallFuture callback: requests.values()) { - callback.setFailed(cause.getMessage(), cause); - } - - if(LOG.isDebugEnabled()) { - LOG.error("" + cause.getMessage(), cause); - } else { - LOG.error("RPC Exception:" + cause.getMessage()); - } - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); - } - } - } - - static class ProtoCallFuture implements Future<Message> { - private Semaphore sem = new Semaphore(0); - private Message response = null; - private Message returnType; - - private RpcController controller; - - private ExecutionException ee; - - public ProtoCallFuture(RpcController controller, Message message) { - this.controller = controller; - this.returnType = message; - } - - @Override - public boolean cancel(boolean arg0) { - return false; - } - - @Override - public Message get() throws InterruptedException, ExecutionException { - sem.acquire(); - if(ee != null) { - throw ee; - } - return response; - } - - @Override - public Message get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - if(sem.tryAcquire(timeout, unit)) { - if (ee != null) { - throw ee; - } - return response; - } else { - throw new TimeoutException(); - } - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public boolean isDone() { - return sem.availablePermits() > 0; - } - - public void setResponse(Message response) { - this.response = response; - sem.release(); - } - - public void setFailed(String errorText, Throwable t) { - if(controller != null) { - this.controller.setFailed(errorText); - } - ee = new ExecutionException(errorText, t); - sem.release(); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java deleted file mode 100644 index 0ce359f..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.BlockingService; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; - -import io.netty.channel.*; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.RpcProtos.RpcRequest; -import org.apache.tajo.rpc.RpcProtos.RpcResponse; - -import io.netty.util.ReferenceCountUtil; - -import java.lang.reflect.Method; -import java.net.InetSocketAddress; - -public class BlockingRpcServer extends NettyServerBase { - private static Log LOG = LogFactory.getLog(BlockingRpcServer.class); - private final BlockingService service; - private final ChannelInitializer<Channel> initializer; - - public BlockingRpcServer(final Class<?> protocol, - final Object instance, - final InetSocketAddress bindAddress, - final int workerNum) - throws Exception { - - super(protocol.getSimpleName(), bindAddress); - - String serviceClassName = protocol.getName() + "$" + - protocol.getSimpleName() + "Service"; - Class<?> serviceClass = Class.forName(serviceClassName); - Class<?> interfaceClass = Class.forName(serviceClassName + - "$BlockingInterface"); - Method method = serviceClass.getMethod( - "newReflectiveBlockingService", interfaceClass); - - this.service = (BlockingService) method.invoke(null, instance); - this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); - - super.init(this.initializer, workerNum); - } - - @ChannelHandler.Sharable - private class ServerHandler extends ChannelInboundHandlerAdapter { - - @Override - public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - accepted.add(ctx.channel()); - if(LOG.isDebugEnabled()){ - LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size())); - } - super.channelRegistered(ctx); - } - - @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - accepted.remove(ctx.channel()); - if (LOG.isDebugEnabled()) { - LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size()); - } - super.channelUnregistered(ctx); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - - if (msg instanceof RpcRequest) { - try { - final RpcRequest request = (RpcRequest) msg; - - String methodName = request.getMethodName(); - MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); - - if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); - } - Message paramProto = null; - if (request.hasRequestMessage()) { - try { - paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() - .mergeFrom(request.getRequestMessage()).build(); - - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - } - Message returnValue; - RpcController controller = new NettyRpcController(); - - try { - returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto); - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - - RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); - - if (returnValue != null) { - builder.setResponseMessage(returnValue.toByteString()); - } - - if (controller.failed()) { - builder.setErrorMessage(controller.errorText()); - } - ctx.writeAndFlush(builder.build()); - } finally { - ReferenceCountUtil.release(msg); - } - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - if (cause instanceof RemoteCallException) { - RemoteCallException callException = (RemoteCallException) cause; - ctx.writeAndFlush(callException.getResponse()); - } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); - } - } - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java deleted file mode 100644 index c4c3256..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - -import java.util.concurrent.Future; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public class CallFuture<T> implements RpcCallback<T>, Future<T> { - - private final Semaphore sem = new Semaphore(0); - private boolean done = false; - private T response; - private RpcController controller; - - public CallFuture() { - controller = new DefaultRpcController(); - } - - public RpcController getController() { - return controller; - } - - @Override - public void run(T t) { - this.response = t; - done = true; - sem.release(); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - controller.startCancel(); - sem.release(); - return controller.isCanceled(); - } - - @Override - public boolean isCancelled() { - return controller.isCanceled(); - } - - @Override - public boolean isDone() { - return done; - } - - @Override - public T get() throws InterruptedException { - sem.acquire(); - - return response; - } - - @Override - public T get(long timeout, TimeUnit unit) - throws InterruptedException, TimeoutException { - if (sem.tryAcquire(timeout, unit)) { - return response; - } else { - throw new TimeoutException(); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java deleted file mode 100644 index 4ba19a5..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - -public class DefaultRpcController implements RpcController { - private String errorText; - private boolean error; - private boolean canceled; - - @Override - public void reset() { - errorText = ""; - error = false; - canceled = false; - } - - @Override - public boolean failed() { - return error; - } - - @Override - public String errorText() { - return errorText; - } - - @Override - public void startCancel() { - this.canceled = true; - } - - @Override - public void setFailed(String s) { - this.errorText = s; - this.error = true; - } - - @Override - public boolean isCanceled() { - return canceled; - } - - @Override - public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) { - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java deleted file mode 100644 index 72278f2..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import io.netty.channel.*; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.concurrent.GenericFutureListener; - -import java.io.Closeable; -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -public abstract class NettyClientBase implements Closeable { - private static final Log LOG = LogFactory.getLog(NettyClientBase.class); - private static final int CONNECTION_TIMEOUT = 60000; // 60 sec - private static final long PAUSE = 1000; // 1 sec - - private final int numRetries; - - private Bootstrap bootstrap; - private volatile ChannelFuture channelFuture; - - protected final Class<?> protocol; - protected final AtomicInteger sequence = new AtomicInteger(0); - - private final RpcConnectionKey key; - private final AtomicInteger counter = new AtomicInteger(0); // reference counter - - public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries) - throws ClassNotFoundException, NoSuchMethodException { - this.key = rpcConnectionKey; - this.protocol = rpcConnectionKey.protocolClass; - this.numRetries = numRetries; - } - - // should be called from sub class - protected void init(ChannelInitializer<Channel> initializer) { - this.bootstrap = new Bootstrap(); - this.bootstrap - .channel(NioSocketChannel.class) - .handler(initializer) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT) - .option(ChannelOption.SO_RCVBUF, 1048576 * 10) - .option(ChannelOption.TCP_NODELAY, true); - } - - public RpcConnectionPool.RpcConnectionKey getKey() { - return key; - } - - protected final Class<?> getServiceClass() throws ClassNotFoundException { - String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service"; - return Class.forName(serviceClassName); - } - - @SuppressWarnings("unchecked") - protected final <T> T getStub(Method stubMethod, Object rpcChannel) { - try { - return (T) stubMethod.invoke(null, rpcChannel); - } catch (Exception e) { - throw new RemoteException(e.getMessage(), e); - } - } - - public abstract <T> T getStub(); - - public boolean acquire(long timeout) { - if (!checkConnection(timeout)) { - return false; - } - counter.incrementAndGet(); - return true; - } - - public boolean release() { - return counter.decrementAndGet() == 0; - } - - private boolean checkConnection(long timeout) { - if (isConnected()) { - return true; - } - - InetSocketAddress addr = key.addr; - if (addr.isUnresolved()) { - addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort()); - } - - return handleConnectionInternally(addr, timeout); - } - - private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) { - LOG.warn("Try to connect : " + address); - this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup()) - .connect(address) - .addListener(listener); - } - - // first attendant kicks connection - private final RpcUtils.Scrutineer<CountDownLatch> connect = new RpcUtils.Scrutineer<CountDownLatch>(); - - private boolean handleConnectionInternally(final InetSocketAddress addr, long timeout) { - final CountDownLatch ticket = new CountDownLatch(1); - final CountDownLatch granted = connect.check(ticket); - - // basically, it's double checked lock - if (ticket == granted && isConnected()) { - granted.countDown(); - return true; - } - - if (ticket == granted) { - connectUsingNetty(addr, new RetryConnectionListener(addr, granted)); - } - - try { - granted.await(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore - } - - boolean success = channelFuture.isSuccess(); - - if (granted.getCount() == 0) { - connect.clear(granted); - } - - return success; - } - - class RetryConnectionListener implements GenericFutureListener<ChannelFuture> { - private final AtomicInteger retryCount = new AtomicInteger(); - private final InetSocketAddress address; - private final CountDownLatch latch; - - RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) { - this.address = address; - this.latch = latch; - } - - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if (!channelFuture.isSuccess()) { - channelFuture.channel().close(); - - if (numRetries > retryCount.getAndIncrement()) { - final GenericFutureListener<ChannelFuture> currentListener = this; - - RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() { - @Override - public void run() { - connectUsingNetty(address, currentListener); - } - }, PAUSE, TimeUnit.MILLISECONDS); - - LOG.debug("Connecting to " + address + " has been failed. Retrying to connect."); - } - else { - latch.countDown(); - - LOG.error("Max retry count has been exceeded. attempts=" + numRetries); - } - } - else { - latch.countDown(); - } - } - } - - public Channel getChannel() { - return channelFuture == null ? null : channelFuture.channel(); - } - - public boolean isConnected() { - Channel channel = getChannel(); - return channel != null && channel.isOpen() && channel.isActive(); - } - - public SocketAddress getRemoteAddress() { - Channel channel = getChannel(); - return channel == null ? null : channel.remoteAddress(); - } - - @Override - public void close() { - Channel channel = getChannel(); - if (channel != null && channel.isOpen()) { - LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress()); - channel.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcController.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcController.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcController.java deleted file mode 100644 index b7f4537..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyRpcController.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - -public class NettyRpcController implements RpcController { - private String errorText; - - @Override - public void reset() { - errorText = null; - } - - @Override - public boolean failed() { - return errorText != null; - } - - @Override - public String errorText() { - return errorText; - } - - @Override - public void startCancel() { - // TODO - to be implemented - throw new UnsupportedOperationException(); - } - - @Override - public void setFailed(String s) { - errorText = s; - } - - @Override - public boolean isCanceled() { - // TODO - to be implemented - return false; - } - - @Override - public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java deleted file mode 100644 index 024108b..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.util.concurrent.GlobalEventExecutor; - -import java.io.IOException; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; - -public class NettyServerBase { - private static final Log LOG = LogFactory.getLog(NettyServerBase.class); - private static final String DEFAULT_PREFIX = "RpcServer_"; - private static final AtomicInteger sequenceId = new AtomicInteger(0); - - protected String serviceName; - protected InetSocketAddress serverAddr; - protected InetSocketAddress bindAddress; - protected ChannelInitializer<Channel> initializer; - protected ServerBootstrap bootstrap; - protected ChannelFuture channelFuture; - protected ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - - private InetSocketAddress initIsa; - - public NettyServerBase(InetSocketAddress address) { - this.initIsa = address; - } - - public NettyServerBase(String serviceName, InetSocketAddress addr) { - this.serviceName = serviceName; - this.initIsa = addr; - } - - public void setName(String name) { - this.serviceName = name; - } - - public void init(ChannelInitializer<Channel> initializer, int workerNum) { - bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum); - - this.initializer = initializer; - bootstrap - .channel(NioServerSocketChannel.class) - .childHandler(initializer) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .childOption(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) - .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10); - } - - public InetSocketAddress getListenAddress() { - return this.bindAddress; - } - - public void start() { - if (serviceName == null) { - this.serviceName = getNextDefaultServiceName(); - } - - if (initIsa.getPort() == 0) { - try { - int port = getUnusedPort(); - serverAddr = new InetSocketAddress(initIsa.getHostName(), port); - } catch (IOException e) { - LOG.error(e, e); - } - } else { - serverAddr = initIsa; - } - - this.channelFuture = bootstrap.clone().bind(serverAddr).syncUninterruptibly(); - this.bindAddress = (InetSocketAddress) channelFuture.channel().localAddress(); - - LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress); - } - - public Channel getChannel() { - return this.channelFuture.channel(); - } - - public void shutdown() { - shutdown(false); - } - - public void shutdown(boolean waitUntilThreadsStop) { - try { - accepted.close(); - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - } - - if(bootstrap != null) { - if (bootstrap.childGroup() != null) { - bootstrap.childGroup().shutdownGracefully(); - if (waitUntilThreadsStop) { - bootstrap.childGroup().terminationFuture().awaitUninterruptibly(); - } - } - - if (bootstrap.group() != null) { - bootstrap.group().shutdownGracefully(); - if (waitUntilThreadsStop) { - bootstrap.childGroup().terminationFuture().awaitUninterruptibly(); - } - } - } - - if (bindAddress != null) { - LOG.info("Rpc (" + serviceName + ") listened on " - + RpcUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown"); - } - } - - private static String getNextDefaultServiceName() { - return DEFAULT_PREFIX + sequenceId.getAndIncrement(); - } - - private static final int startPortRange = 10000; - private static final int endPortRange = 50000; - private static final Random rnd = new Random(System.currentTimeMillis()); - // each system has a different starting port number within the given range. - private static final AtomicInteger nextPortNum = - new AtomicInteger(startPortRange+ rnd.nextInt(endPortRange - startPortRange)); - private static final Object lockObject = new Object(); - - - private synchronized static int getUnusedPort() throws IOException { - while (true) { - int port = nextPortNum.getAndIncrement(); - if (port >= endPortRange) { - synchronized (lockObject) { - nextPortNum.set(startPortRange); - port = nextPortNum.getAndIncrement(); - } - } - if (available(port)) { - return port; - } - } - } - - private static boolean available(int port) throws IOException { - if (port < 1024 || port > 65535) { - throw new IllegalArgumentException("Port Number Out of Bound: " + port); - } - - ServerSocket ss = null; - DatagramSocket ds = null; - - try { - ss = new ServerSocket(port); - ss.setReuseAddress(true); - - ds = new DatagramSocket(port); - ds.setReuseAddress(true); - - return true; - - } catch (IOException e) { - return false; - } finally { - if (ss != null) { - ss.close(); - } - - if (ds != null) { - ds.close(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java deleted file mode 100644 index 9b7f8ac..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.RpcCallback; - -public class NullCallback implements RpcCallback<Object> { - private final static NullCallback instance; - - static { - instance = new NullCallback(); - } - - public static RpcCallback get() { - return instance; - } - - @Override - public void run(Object parameter) { - // do nothing - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java deleted file mode 100644 index 6a340dc..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufEncoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; - -import com.google.protobuf.MessageLite; - -class ProtoChannelInitializer extends ChannelInitializer<Channel> { - private final MessageLite defaultInstance; - private final ChannelHandler handler; - - public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) { - this.handler = handler; - this.defaultInstance = defaultInstance; - } - - @Override - protected void initChannel(Channel channel) throws Exception { - ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder()); - pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); - pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); - pipeline.addLast("protobufEncoder", new ProtobufEncoder()); - pipeline.addLast("handler", handler); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java deleted file mode 100644 index 52ef31a..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.Descriptors.MethodDescriptor; -import org.apache.tajo.rpc.RpcProtos.RpcResponse; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.io.Writer; - -public class RemoteCallException extends RemoteException { - private int seqId; - private String originExceptionClass; - - public RemoteCallException(int seqId, MethodDescriptor methodDesc, - Throwable t) { - super("Remote call error occurs when " + methodDesc.getFullName() + "is called:", t); - this.seqId = seqId; - if (t != null) { - originExceptionClass = t.getClass().getCanonicalName(); - } - } - - public RemoteCallException(int seqId, Throwable t) { - super(t); - this.seqId = seqId; - if (t != null) { - originExceptionClass = t.getClass().getCanonicalName(); - } - } - - public RpcResponse getResponse() { - RpcResponse.Builder builder = RpcResponse.newBuilder(); - builder.setId(seqId); - if (getCause().getMessage() == null) { - builder.setErrorMessage(getCause().getClass().getName()); - } else { - builder.setErrorMessage(getCause().getMessage()); - } - builder.setErrorTrace(getStackTraceString(getCause())); - builder.setErrorClass(originExceptionClass); - - return builder.build(); - } - - private static String getStackTraceString(Throwable aThrowable) { - final Writer result = new StringWriter(); - final PrintWriter printWriter = new PrintWriter(result); - aThrowable.printStackTrace(printWriter); - return result.toString(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteException.java deleted file mode 100644 index 30c110d..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteException.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -public class RemoteException extends RuntimeException { - public RemoteException() { - super(); - } - - public RemoteException(String message) { - super(message); - } - - public RemoteException(Throwable t) { - super(t); - } - - public RemoteException(String message, Throwable t) { - super(message, t); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java deleted file mode 100644 index 3c054ad..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import java.io.IOException; -import java.util.Date; -import java.util.List; - -public class RetriesExhaustedException extends RuntimeException { - private static final long serialVersionUID = 1876775844L; - - public RetriesExhaustedException(final String msg) { - super(msg); - } - - public RetriesExhaustedException(final String msg, final IOException e) { - super(msg, e); - } - - /** - * Datastructure that allows adding more info around Throwable incident. - */ - public static class ThrowableWithExtraContext { - private final Throwable t; - private final long when; - private final String extras; - - public ThrowableWithExtraContext(final Throwable t, final long when, - final String extras) { - this.t = t; - this.when = when; - this.extras = extras; - } - - @Override - public String toString() { - return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); - } - } - - /** - * Create a new RetriesExhaustedException from the list of prior failures. - * @param callableVitals Details from the {@link ServerCallable} we were using - * when we got this exception. - * @param numTries The number of tries we made - * @param exceptions List of exceptions that failed before giving up - */ - public RetriesExhaustedException(final String callableVitals, int numTries, - List<Throwable> exceptions) { - super(getMessage(callableVitals, numTries, exceptions)); - } - - /** - * Create a new RetriesExhaustedException from the list of prior failures. - * @param numTries - * @param exceptions List of exceptions that failed before giving up - */ - public RetriesExhaustedException(final int numTries, - final List<Throwable> exceptions) { - super(getMessage(numTries, exceptions)); - } - - private static String getMessage(String callableVitals, int numTries, - List<Throwable> exceptions) { - StringBuilder buffer = new StringBuilder("Failed contacting "); - buffer.append(callableVitals); - buffer.append(" after "); - buffer.append(numTries + 1); - buffer.append(" attempts.\nExceptions:\n"); - for (Throwable t : exceptions) { - buffer.append(t.toString()); - buffer.append("\n"); - } - return buffer.toString(); - } - - private static String getMessage(final int numTries, - final List<Throwable> exceptions) { - StringBuilder buffer = new StringBuilder("Failed after attempts="); - buffer.append(numTries + 1); - buffer.append(", exceptions:\n"); - for (Throwable t : exceptions) { - buffer.append(t.toString()); - buffer.append("\n"); - } - return buffer.toString(); - } -} \ No newline at end of file
