Author: lewismc
Date: Wed Sep 16 03:37:00 2015
New Revision: 1703325
URL: http://svn.apache.org/r1703325
Log:
OODT-855 Integrate Avro RPC with the catalog module using Serializer
Added:
oodt/branches/avro_rpc/catalog/src/main/avro/
oodt/branches/avro_rpc/catalog/src/main/avro/CommunicationChannel.avpr
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelClient.java
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelClientFactory.java
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelServer.java
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelServerFactory.java
oodt/branches/avro_rpc/catalog/src/test/java/org/apache/oodt/cas/catalog/server/
oodt/branches/avro_rpc/catalog/src/test/java/org/apache/oodt/cas/catalog/server/channel/
oodt/branches/avro_rpc/catalog/src/test/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/
oodt/branches/avro_rpc/catalog/src/test/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/TestAvrorpcCommunicationChannelServer.java
Modified:
oodt/branches/avro_rpc/CHANGES.txt
oodt/branches/avro_rpc/catalog/pom.xml
Modified: oodt/branches/avro_rpc/CHANGES.txt
URL:
http://svn.apache.org/viewvc/oodt/branches/avro_rpc/CHANGES.txt?rev=1703325&r1=1703324&r2=1703325&view=diff
==============================================================================
--- oodt/branches/avro_rpc/CHANGES.txt (original)
+++ oodt/branches/avro_rpc/CHANGES.txt Wed Sep 16 03:37:00 2015
@@ -3,6 +3,8 @@ Apache OODT Change Log
Current Development
+* OODT-855 Integrate Avro RPC with the catalog module using Serializer (Radu
Manole, mattmann, lewismc)
+
* OODT-866 Integrate Avro RPC with the workflow module (Radu Manole, mattmann,
lewismc)
* OODT-874 Corrected workflow/bin/wmgr and resource/bin/resmgr to make sure
Modified: oodt/branches/avro_rpc/catalog/pom.xml
URL:
http://svn.apache.org/viewvc/oodt/branches/avro_rpc/catalog/pom.xml?rev=1703325&r1=1703324&r2=1703325&view=diff
==============================================================================
--- oodt/branches/avro_rpc/catalog/pom.xml (original)
+++ oodt/branches/avro_rpc/catalog/pom.xml Wed Sep 16 03:37:00 2015
@@ -90,6 +90,23 @@
</profiles>
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>1.7.7</version>
+ <configuration>
+ <stringType>String</stringType>
+ </configuration>
+ <executions>
+ <execution>
+ <id>schemas</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protocol</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
@@ -136,6 +153,17 @@
</notifiers>
</ciManagement>
<dependencies>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.oodt</groupId>
<artifactId>oodt-commons</artifactId>
Added: oodt/branches/avro_rpc/catalog/src/main/avro/CommunicationChannel.avpr
URL:
http://svn.apache.org/viewvc/oodt/branches/avro_rpc/catalog/src/main/avro/CommunicationChannel.avpr?rev=1703325&view=auto
==============================================================================
--- oodt/branches/avro_rpc/catalog/src/main/avro/CommunicationChannel.avpr
(added)
+++ oodt/branches/avro_rpc/catalog/src/main/avro/CommunicationChannel.avpr Wed
Sep 16 03:37:00 2015
@@ -0,0 +1,481 @@
+{
+ "protocol" : "AvroCommunicationChannel",
+ "namespace" : "org.apache.oodt.cas.catalog.server.channel.avrorpc",
+ "messages" :
+ {
+ "avrorpc_startup" :
+ {
+ "request" :
+ [
+ ],
+ "response" : "null"
+ },
+ "avrorpc_shutdown" :
+ {
+ "request" :
+ [
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_addCatalog1" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_replaceCatalog" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_addCatalog2" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogId",
+ "type" : "string"
+ },
+ {
+ "name" : "indexObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_addCatalog3" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogId",
+ "type" : "string"
+ },
+ {
+ "name" : "indexObject",
+ "type" : "string"
+ },
+ {
+ "name" : "dictionariesObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_addCatalog5" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogId",
+ "type" : "string"
+ },
+ {
+ "name" : "indexObject",
+ "type" : "string"
+ },
+ {
+ "name" : "dictionariesObject",
+ "type" : "string"
+ },
+ {
+ "name" : "restrictQueryPermissionObject",
+ "type" : "string"
+ },
+ {
+ "name" : "restrictIngestPermissionObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_addDictionary" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogId",
+ "type" : "string"
+ },
+ {
+ "name" : "dictionariesObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_replaceDictionaries" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogId",
+ "type" : "string"
+ },
+ {
+ "name" : "dictionariesObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_replaceIndex" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogId",
+ "type" : "string"
+ },
+ {
+ "name" : "indexObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_modifyIngestPermission" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogId",
+ "type" : "string"
+ },
+ {
+ "name" : "restrictIngestPermissionObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_modifyQueryPermission" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogId",
+ "type" : "string"
+ },
+ {
+ "name" : "restrictQueryPermissionObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_delete" :
+ {
+ "request" :
+ [
+ {
+ "name" : "metadataObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_getPluginUrls" :
+ {
+ "request" :
+ [
+ ],
+ "response" : "string"
+ },
+ "avrorpc_addPluginUrls" :
+ {
+ "request" :
+ [
+ {
+ "name" : "pluginUrlsObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_getPluginStorageDir" :
+ {
+ "request" :
+ [
+ ],
+ "response" : "string"
+ },
+ "avrorpc_transferFile" :
+ {
+ "request" :
+ [
+ {
+ "name" : "filePath",
+ "type" : "string"
+ },
+ {
+ "name" : "fileData",
+ "type" : "bytes"
+ },
+ {
+ "name" : "offset",
+ "type" : "int"
+ },
+ {
+ "name" : "numBytes",
+ "type" : "int"
+ }
+ ],
+ "response" : "boolean"
+ },
+ "avrorpc_getAllPages" :
+ {
+ "request" :
+ [
+ {
+ "name" : "queryPagerObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getCalalogProperties1" :
+ {
+ "request" :
+ [
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getCalalogProperties2" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogUrn",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getCatalogServiceTransactionId" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogTransactionIdObject",
+ "type" : "string"
+ },
+ {
+ "name" : "catalogUrn",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getCatalogServiceTransactionId2" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogReceiptObject",
+ "type" : "string"
+ },
+ {
+ "name" : "generateNewObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getCatalogServiceTransactionIds" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogTransactionIdsObject",
+ "type" : "string"
+ },
+ {
+ "name" : "catalogUrn",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getCurrentCatalogIds" :
+ {
+ "request" :
+ [
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getMetadataFromTransactionIdStrings" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogServiceTransactionIdStringsObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getMetadataFromTransactionIds" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogServiceTransactionIdsObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getNextPage" :
+ {
+ "request" :
+ [
+ {
+ "name" : "queryPagerObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getProperty" :
+ {
+ "request" :
+ [
+ {
+ "name" : "key",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_ingest" :
+ {
+ "request" :
+ [
+ {
+ "name" : "metadataObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_isRestrictIngestPermissions" :
+ {
+ "request" :
+ [
+ ],
+ "response" : "string"
+ },
+ "avrorpc_isRestrictQueryPermissions" :
+ {
+ "request" :
+ [
+ ],
+ "response" : "string"
+ },
+ "avrorpc_query1" :
+ {
+ "request" :
+ [
+ {
+ "name" : "queryExpressionObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_query2" :
+ {
+ "request" :
+ [
+ {
+ "name" : "queryExpressionObject",
+ "type" : "string"
+ },
+ {
+ "name" : "catalogIdsObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getNextPage2" :
+ {
+ "request" :
+ [
+ {
+ "name" : "pageObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getPage2" :
+ {
+ "request" :
+ [
+ {
+ "name" : "pageInfoObject",
+ "type" : "string"
+ },
+ {
+ "name" : "queryExpressionObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getPage3" :
+ {
+ "request" :
+ [
+ {
+ "name" : "pageInfoObject",
+ "type" : "string"
+ },
+ {
+ "name" : "queryExpressionObject",
+ "type" : "string"
+ },
+ {
+ "name" : "catalogIdsObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_getMetadata" :
+ {
+ "request" :
+ [
+ {
+ "name" : "pageObject",
+ "type" : "string"
+ }
+ ],
+ "response" : "string"
+ },
+ "avrorpc_removeCatalog" :
+ {
+ "request" :
+ [
+ {
+ "name" : "catalogUrn",
+ "type" : "string"
+ }
+ ],
+ "response" : "boolean"
+ }
+ }
+}
\ No newline at end of file
Added:
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelClient.java
URL:
http://svn.apache.org/viewvc/oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelClient.java?rev=1703325&view=auto
==============================================================================
---
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelClient.java
(added)
+++
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelClient.java
Wed Sep 16 03:37:00 2015
@@ -0,0 +1,299 @@
+package org.apache.oodt.cas.catalog.server.channel.avrorpc;
+
+//OODT imports
+import org.apache.oodt.cas.catalog.metadata.TransactionalMetadata;
+import org.apache.oodt.cas.catalog.page.*;
+import org.apache.oodt.cas.catalog.query.QueryExpression;
+import
org.apache.oodt.cas.catalog.server.channel.AbstractCommunicationChannelClient;
+import org.apache.oodt.cas.catalog.struct.Dictionary;
+import org.apache.oodt.cas.catalog.struct.Index;
+import org.apache.oodt.cas.catalog.struct.TransactionId;
+import org.apache.oodt.cas.catalog.system.Catalog;
+import org.apache.oodt.cas.catalog.util.PluginURL;
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.avro.ipc.Transceiver;
+
+//JDK imports
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+//APACHE imports
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+
+
+
+public class AvrorpcCommunicationChannelClient extends
AbstractCommunicationChannelClient {
+ private Transceiver client;
+ private AvroCommunicationChannel proxy;
+ protected int chunkSize;
+
+ public AvrorpcCommunicationChannelClient(URL serverUrl, int
connectionTimeout, int requestTimeout, int chunkSize) throws IOException {
+
+ this.client = new NettyTransceiver(new
InetSocketAddress(serverUrl.getHost(),serverUrl.getPort()),(long)connectionTimeout);
+
+ this.proxy = (AvroCommunicationChannel)
SpecificRequestor.getClient(AvroCommunicationChannel.class, client);
+ this.chunkSize = chunkSize;
+
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+ this.proxy.avrorpc_shutdown();
+
+ }
+
+ @Override
+ public boolean isRestrictQueryPermissions() throws Exception {
+ return this.serializer.deserializeObject(Boolean.class,(String)
this.proxy.avrorpc_isRestrictQueryPermissions());
+ }
+
+ @Override
+ public boolean isRestrictIngestPermissions() throws Exception {
+ return this.serializer.deserializeObject(Boolean.class, (String)
this.proxy.avrorpc_isRestrictIngestPermissions());
+
+ }
+
+ @Override
+ public void addCatalog(Catalog catalog) throws Exception {
+
this.proxy.avrorpc_addCatalog1(this.serializer.serializeObject(catalog));
+ }
+
+ @Override
+ public void replaceCatalog(Catalog catalog) throws Exception {
+ this.proxy.avrorpc_replaceCatalog(
+ this.serializer.serializeObject(catalog)
+ );
+ }
+
+ @Override
+ public void addCatalog(String catalogId, Index index) throws Exception {
+ this.proxy.avrorpc_addCatalog2(catalogId,
+ this.serializer.serializeObject(index));
+ }
+
+ @Override
+ public void addCatalog(String catalogId, Index index, List<Dictionary>
dictionaries) throws Exception {
+ this.proxy.avrorpc_addCatalog3(catalogId,
+ this.serializer.serializeObject(index),
+ this.serializer.serializeObject(dictionaries));
+ }
+
+ @Override
+ public void addCatalog(String catalogId, Index index, List<Dictionary>
dictionaries, boolean restrictQueryPermission, boolean
restrictIngestPermission) throws Exception {
+ this.proxy.avrorpc_addCatalog5(catalogId,
+ this.serializer.serializeObject(index),
+ this.serializer.serializeObject(dictionaries),
+ this.serializer.serializeObject(restrictQueryPermission),
+ this.serializer.serializeObject(restrictIngestPermission));
+ }
+
+ @Override
+ public void addDictionary(String catalogId, Dictionary dictionary) throws
Exception {
+ this.proxy.avrorpc_addDictionary(catalogId,
+ this.serializer.serializeObject(dictionary));
+ }
+
+ @Override
+ public void replaceDictionaries(String catalogId, List<Dictionary>
dictionaries) throws Exception {
+ this.proxy.avrorpc_replaceDictionaries(catalogId,
+ this.serializer.serializeObject(dictionaries));
+ }
+
+ @Override
+ public void replaceIndex(String catalogId, Index index) throws Exception {
+ this.proxy.avrorpc_replaceIndex(catalogId,
+ this.serializer.serializeObject(index));
+ }
+
+ @Override
+ public void modifyIngestPermission(String catalogId, boolean
restrictIngestPermission) throws Exception {
+ this.proxy.avrorpc_modifyIngestPermission(catalogId,
+ this.serializer.serializeObject(restrictIngestPermission));
+ }
+
+ @Override
+ public void modifyQueryPermission(String catalogId, boolean
restrictQueryPermission) throws Exception {
+ this.proxy.avrorpc_modifyQueryPermission(catalogId,
+ this.serializer.serializeObject(restrictQueryPermission));
+ }
+
+ @Override
+ public void removeCatalog(String catalogUrn) throws Exception {
+ this.proxy.avrorpc_removeCatalog(catalogUrn);
+ }
+
+ @Override
+ public List<PluginURL> getPluginUrls() throws Exception {
+ return
this.serializer.deserializeObject(List.class,(String)this.proxy.avrorpc_getPluginUrls());
+ }
+
+ @Override
+ public void addPluginUrls(List<PluginURL> pluginUrls) throws Exception {
+
this.proxy.avrorpc_addPluginUrls(this.serializer.serializeObject(pluginUrls));
+ }
+
+ @Override
+ public URL getPluginStorageDir() throws Exception {
+ return
this.serializer.deserializeObject(URL.class,this.proxy.avrorpc_getPluginStorageDir());
+ }
+
+ @Override
+ public void transferUrl(URL fromUrl, URL toURL) throws Exception {
+ System.out.println("Transfering '" + fromUrl + "' to '" + toURL + "'");
+ FileInputStream is = null;
+ try {
+ byte[] buf = new byte[this.chunkSize];
+ is = new FileInputStream(new File(fromUrl.getPath()));
+ int offset = 0;
+ int numBytes = 0;
+ while ((numBytes = is.read(buf, offset, chunkSize)) != -1)
+ this.transferFile(new File(toURL.getPath()).getAbsolutePath(),
buf, offset, numBytes);
+ }catch (Exception e) {
+ throw e;
+ }finally {
+ try {
+ is.close();
+ }catch(Exception e) {}
+ }
+ }
+
+ protected void transferFile(String filePath, byte[] fileData, int offset,
+ int numBytes) throws Exception {
+
this.proxy.avrorpc_transferFile(filePath,ByteBuffer.wrap(fileData),offset,numBytes);
+ }
+
+ @Override
+ public Set<String> getCurrentCatalogIds() throws Exception {
+ return
this.serializer.deserializeObject(Set.class,(String)this.proxy.avrorpc_getCurrentCatalogIds());
+ }
+
+ @Override
+ public TransactionReceipt ingest(Metadata metadata) throws Exception {
+ return
this.serializer.deserializeObject(TransactionReceipt.class,(String)this.proxy.avrorpc_ingest(this.serializer.serializeObject(metadata)));
+ }
+
+ @Override
+ public void delete(Metadata metadata) throws Exception {
+ this.proxy.avrorpc_delete(this.serializer.serializeObject(metadata));
+ }
+
+ @Override
+ public List<String> getProperty(String key) throws Exception {
+ return
this.serializer.deserializeObject(List.class,(String)this.proxy.avrorpc_getProperty(key));
+ }
+
+ @Override
+ public Properties getCalalogProperties() throws Exception {
+ return
this.serializer.deserializeObject(Properties.class,(String)this.proxy.avrorpc_getCalalogProperties1());
+ }
+
+ @Override
+ public Properties getCalalogProperties(String catalogUrn) throws Exception
{
+ return
this.serializer.deserializeObject(Properties.class,(String)this.proxy.avrorpc_getCalalogProperties2(catalogUrn));
+ }
+
+ @Override
+ public Page getNextPage(Page page) throws Exception {
+ return
this.serializer.deserializeObject(Page.class,(String)this.proxy.avrorpc_getNextPage2(this.serializer.serializeObject(page)));
+ }
+
+ @Override
+ public Page getPage(PageInfo pageInfo, QueryExpression queryExpression)
throws Exception {
+ return
this.serializer.deserializeObject(Page.class,(String)this.proxy.avrorpc_getPage2(
+ this.serializer.serializeObject(pageInfo),
this.serializer.serializeObject(queryExpression)));
+ }
+
+ @Override
+ public Page getPage(PageInfo pageInfo, QueryExpression queryExpression,
Set<String> catalogIds) throws Exception {
+ return
this.serializer.deserializeObject(Page.class,(String)this.proxy.avrorpc_getPage3(
+ this.serializer.serializeObject(pageInfo),
+ this.serializer.serializeObject(queryExpression),
+ this.serializer.serializeObject(catalogIds)
+ ));
+ }
+
+ @Override
+ public List<TransactionalMetadata> getMetadata(Page page) throws Exception
{
+ return
this.serializer.deserializeObject(List.class,(String)this.proxy.avrorpc_getMetadata(
+ this.serializer.serializeObject(page)
+ ));
+ }
+
+ @Override
+ public QueryPager query(QueryExpression queryExpression) throws Exception {
+ return
this.serializer.deserializeObject(QueryPager.class,(String)this.proxy.avrorpc_query1(
+ this.serializer.serializeObject(queryExpression)
+ ));
+ }
+
+ @Override
+ public QueryPager query(QueryExpression queryExpression, Set<String>
catalogIds) throws Exception {
+ return
this.serializer.deserializeObject(QueryPager.class,(String)this.proxy.avrorpc_query2(
+ this.serializer.serializeObject(queryExpression),
+ this.serializer.serializeObject(catalogIds)
+ ));
+ }
+
+ @Override
+ public List<TransactionalMetadata> getNextPage(QueryPager queryPager)
throws Exception {
+ return
this.serializer.deserializeObject(List.class,(String)this.proxy.avrorpc_getNextPage(
+ this.serializer.serializeObject(queryPager)
+ ));
+ }
+ // somethings is wrong
+ @Override
+ public List<TransactionId<?>> getTransactionIdsForAllPages(QueryPager
queryPager) throws Exception {
+ return null;
+ }
+
+ @Override
+ public List<TransactionalMetadata> getAllPages(QueryPager queryPager)
throws Exception {
+ return
this.serializer.deserializeObject(List.class,(String)this.proxy.avrorpc_getAllPages(
+ this.serializer.serializeObject(queryPager)
+ ));
+ }
+
+ @Override
+ public List<TransactionalMetadata>
getMetadataFromTransactionIdStrings(List<String>
catalogServiceTransactionIdStrings) throws Exception {
+ return
this.serializer.deserializeObject(List.class,(String)this.proxy.avrorpc_getMetadataFromTransactionIdStrings(
+
this.serializer.serializeObject(catalogServiceTransactionIdStrings)
+ ));
+ }
+
+ @Override
+ public List<TransactionalMetadata>
getMetadataFromTransactionIds(List<TransactionId<?>>
catalogServiceTransactionIds) throws Exception {
+ return
this.serializer.deserializeObject(List.class,(String)this.proxy.avrorpc_getMetadataFromTransactionIds(
+ this.serializer.serializeObject(catalogServiceTransactionIds)
+ ));
+ }
+
+ @Override
+ public List<TransactionId<?>>
getCatalogServiceTransactionIds(List<TransactionId<?>> catalogTransactionIds,
String catalogUrn) throws Exception {
+ return
this.serializer.deserializeObject(List.class,(String)this.proxy.avrorpc_getCatalogServiceTransactionId2(
+ this.serializer.serializeObject(catalogTransactionIds),
+ catalogUrn
+ ));
+ }
+
+ @Override
+ public TransactionId<?> getCatalogServiceTransactionId(TransactionId<?>
catalogTransactionId, String catalogUrn) throws Exception {
+ return
this.serializer.deserializeObject(TransactionId.class,(String)this.proxy.avrorpc_getCatalogServiceTransactionId2(
+ this.serializer.serializeObject(catalogTransactionId),
+ catalogUrn
+ ));
+ }
+
+ @Override
+ public TransactionId<?> getCatalogServiceTransactionId(CatalogReceipt
catalogReceipt, boolean generateNew) throws Exception {
+ return
this.serializer.deserializeObject(TransactionId.class,(String)this.proxy.avrorpc_getCatalogServiceTransactionId2(
+ this.serializer.serializeObject(catalogReceipt),
+ this.serializer.serializeObject(generateNew)
+ ));
+ }
+}
Added:
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelClientFactory.java
URL:
http://svn.apache.org/viewvc/oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelClientFactory.java?rev=1703325&view=auto
==============================================================================
---
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelClientFactory.java
(added)
+++
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelClientFactory.java
Wed Sep 16 03:37:00 2015
@@ -0,0 +1,63 @@
+package org.apache.oodt.cas.catalog.server.channel.avrorpc;
+
+//OODT imports
+import org.apache.oodt.cas.catalog.server.channel.CommunicationChannelClient;
+import
org.apache.oodt.cas.catalog.server.channel.CommunicationChannelClientFactory;
+
+//JDK imports
+import java.net.URL;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//FRAMEWORK imports
+import org.springframework.beans.factory.annotation.Required;
+
+public class AvrorpcCommunicationChannelClientFactory implements
CommunicationChannelClientFactory {
+ private static Logger LOG =
Logger.getLogger(AvrorpcCommunicationChannelClientFactory.class.getName());
+
+ protected String serverUrl;
+ protected int connectionTimeout;
+ protected int requestTimeout;
+ protected int chunkSize;
+
+ @Override
+ public CommunicationChannelClient createCommunicationChannelClient() {
+ try {
+ return new AvrorpcCommunicationChannelClient(new
URL(this.serverUrl), this.connectionTimeout, this.requestTimeout,
this.chunkSize);
+ }catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to create
AvrorpcCommunicationChannelClient : " + e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Required
+ public void setServerUrl(String serverUrl) {
+ this.serverUrl = serverUrl;
+ }
+
+ public String getServerUrl() {
+ return this.serverUrl;
+ }
+
+ /**
+ * @param connectionTimeout timeout for client in minutes
+ */
+ @Required
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ /**
+ *
+ * @param requestTimeout timout for client in minutes
+ */
+ @Required
+ public void setRequestTimeout(int requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
+ @Required
+ public void setChunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+}
Added:
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelServer.java
URL:
http://svn.apache.org/viewvc/oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelServer.java?rev=1703325&view=auto
==============================================================================
---
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelServer.java
(added)
+++
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelServer.java
Wed Sep 16 03:37:00 2015
@@ -0,0 +1,427 @@
+package org.apache.oodt.cas.catalog.server.channel.avrorpc;
+
+//OODT imports
+import org.apache.oodt.cas.catalog.page.*;
+import org.apache.oodt.cas.catalog.query.QueryExpression;
+import
org.apache.oodt.cas.catalog.server.channel.AbstractCommunicationChannelServer;
+import org.apache.oodt.cas.catalog.struct.Dictionary;
+import org.apache.oodt.cas.catalog.struct.Index;
+import org.apache.oodt.cas.catalog.struct.TransactionId;
+import org.apache.oodt.cas.catalog.system.Catalog;
+import org.apache.oodt.cas.metadata.Metadata;
+
+//JDK imports
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+
+//APACHE imports
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.specific.SpecificResponder;
+
+public class AvrorpcCommunicationChannelServer extends
AbstractCommunicationChannelServer implements AvroCommunicationChannel {
+
+ private Server server;
+
+ public AvrorpcCommunicationChannelServer() {
+ super();
+ }
+
+ @Override
+ public Void avrorpc_startup() throws AvroRemoteException {
+ this.server = new NettyServer(new
SpecificResponder(AvroCommunicationChannel.class,this),new
InetSocketAddress(this.port));
+ this.server.start();
+ return null;
+
+ }
+
+ @Override
+ public boolean avrorpc_shutdown() throws AvroRemoteException {
+ this.server.close();
+ this.server = null;
+ return true;
+ }
+
+
+ @Override
+ public boolean avrorpc_addCatalog1(String catalogObject) throws
AvroRemoteException {
+ try {
+
this.addCatalog(this.serializer.deserializeObject(Catalog.class,(String)catalogObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean avrorpc_replaceCatalog(String catalogObject) throws
AvroRemoteException {
+ try {
+
this.replaceCatalog(this.serializer.deserializeObject(Catalog.class,(String)catalogObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean avrorpc_addCatalog2(String catalogId, String indexObject)
throws AvroRemoteException {
+ try {
+
this.addCatalog((String)catalogId,this.serializer.deserializeObject(Index.class,(String)indexObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean avrorpc_addCatalog3(String catalogId, String indexObject,
String dictionariesObject) throws AvroRemoteException {
+ try {
+ this.addCatalog((String)catalogId,
+
this.serializer.deserializeObject(Index.class,(String)indexObject),
+
this.serializer.deserializeObject(List.class,(String)dictionariesObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean avrorpc_addCatalog5(String catalogId, String indexObject,
String dictionariesObject, String restrictQueryPermissionObject, String
restrictIngestPermissionObject) throws AvroRemoteException {
+ try {
+ this.addCatalog((String)catalogId,
+
this.serializer.deserializeObject(Index.class,(String)indexObject),
+
this.serializer.deserializeObject(List.class,(String)dictionariesObject),
+
this.serializer.deserializeObject(Boolean.class,(String)restrictQueryPermissionObject),
+
this.serializer.deserializeObject(Boolean.class,(String)restrictIngestPermissionObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean avrorpc_addDictionary(String catalogId, String
dictionariesObject) throws AvroRemoteException {
+ try {
+ this.addDictionary((String)catalogId,
+
this.serializer.deserializeObject(Dictionary.class,(String)dictionariesObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean avrorpc_replaceDictionaries(String catalogId, String
dictionariesObject) throws AvroRemoteException {
+ try {
+ this.replaceDictionaries((String)catalogId,
+
this.serializer.deserializeObject(List.class,(String)dictionariesObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean avrorpc_replaceIndex(String catalogId, String indexObject)
throws AvroRemoteException {
+ try {
+
this.replaceIndex((String)catalogId,this.serializer.deserializeObject(Index.class,(String)indexObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean avrorpc_modifyIngestPermission(String catalogId, String
restrictIngestPermissionObject) throws AvroRemoteException {
+ try {
+ this.modifyIngestPermission((String)catalogId,
+
this.serializer.deserializeObject(Boolean.class,(String)restrictIngestPermissionObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean avrorpc_modifyQueryPermission(String catalogId, String
restrictQueryPermissionObject) throws AvroRemoteException {
+ try {
+ this.modifyQueryPermission((String)catalogId,
+
this.serializer.deserializeObject(Boolean.class,(String)restrictQueryPermissionObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean avrorpc_delete(String metadataObject) throws
AvroRemoteException {
+ try {
+
this.delete(this.serializer.deserializeObject(Metadata.class,(String)metadataObject));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public String avrorpc_getPluginUrls() throws AvroRemoteException {
+ try {
+ return this.serializer.serializeObject(this.getPluginUrls());
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean avrorpc_addPluginUrls(String pluginUrlsObject) throws
AvroRemoteException {
+ try {
+ this.addPluginUrls(
+
this.serializer.deserializeObject(List.class,(String)pluginUrlsObject)
+ );
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;
+ }
+
+ @Override
+ public String avrorpc_getPluginStorageDir() throws AvroRemoteException {
+ try {
+ return this.serializer.serializeObject(this.getPluginStorageDir());
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean avrorpc_transferFile(String filePath, ByteBuffer fileData,
int offset, int numBytes) throws AvroRemoteException {
+ FileOutputStream fOut = null;
+ try {
+ File outFile = new File((String)filePath);
+ if (outFile.exists())
+ fOut = new FileOutputStream(outFile, true);
+ else
+ fOut = new FileOutputStream(outFile, false);
+
+
+
+ fOut.write(fileData.array(), offset, numBytes);
+ }catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }finally {
+ try {
+ fOut.close();
+ }catch(Exception e) {}
+ }
+ return true;
+ }
+
+ @Override
+ public String avrorpc_getAllPages(String queryPagerObject) throws
AvroRemoteException {
+ try {
+ return this.serializer.serializeObject(
+
this.getAllPages(this.serializer.deserializeObject(QueryPager.class,
+ (String)queryPagerObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_getCalalogProperties1() throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getCalalogProperties());
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_getCalalogProperties2(String catalogUrn) throws
AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getCalalogProperties((String)catalogUrn));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_getCatalogServiceTransactionId(String
catalogTransactionIdObject, String catalogUrn) throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getCatalogServiceTransactionId(this.serializer.deserializeObject(TransactionId.class,
(String)catalogTransactionIdObject), (String)catalogUrn));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_getCatalogServiceTransactionId2(String
catalogReceiptObject, String generateNewObject) throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getCatalogServiceTransactionId(this.serializer.deserializeObject(CatalogReceipt.class,(String)catalogReceiptObject),
this.serializer.deserializeObject(Boolean.class, (String)generateNewObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_getCatalogServiceTransactionIds(String
catalogTransactionIdsObject, String catalogUrn) throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getCatalogServiceTransactionIds(this.serializer.deserializeObject(List.class,
(String)catalogTransactionIdsObject), (String)catalogUrn));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+
+ }
+
+ @Override
+ public String avrorpc_getCurrentCatalogIds() throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getCurrentCatalogIds());
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_getMetadataFromTransactionIdStrings(String
catalogServiceTransactionIdStringsObject) throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getMetadataFromTransactionIdStrings(this.serializer.deserializeObject(List.class,(String)
catalogServiceTransactionIdStringsObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+
+ }
+
+ @Override
+ public String avrorpc_getMetadataFromTransactionIds(String
catalogServiceTransactionIdsObject) throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getMetadataFromTransactionIds(this.serializer.deserializeObject(List.class,
(String)catalogServiceTransactionIdsObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+
+ }
+
+ @Override
+ public String avrorpc_getNextPage(String queryPagerObject) throws
AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getNextPage(this.serializer.deserializeObject(QueryPager.class,
(String)queryPagerObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+
+ }
+
+ @Override
+ public String avrorpc_getProperty(String key) throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getProperty((String)key));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_ingest(String metadataObject) throws
AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.ingest(this.serializer.deserializeObject(Metadata.class,
(String)metadataObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+
+ }
+
+ @Override
+ public String avrorpc_isRestrictIngestPermissions() throws
AvroRemoteException {
+ try {
+ return this.serializer.serializeObject(new
Boolean(this.isRestrictIngestPermissions()));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_isRestrictQueryPermissions() throws
AvroRemoteException {
+ try {
+ return this.serializer.serializeObject(new
Boolean(this.isRestrictQueryPermissions()));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+
+ }
+
+ @Override
+ public String avrorpc_query1(String queryExpressionObject) throws
AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.query(this.serializer.deserializeObject(QueryExpression.class,(String)
queryExpressionObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_query2(String queryExpressionObject, String
catalogIdsObject) throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.query(this.serializer.deserializeObject(QueryExpression.class,
(String) queryExpressionObject), this.serializer.deserializeObject(Set.class,
(String) catalogIdsObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_getNextPage2(String pageObject) throws
AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getNextPage(this.serializer.deserializeObject(Page.class,(String)
pageObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_getPage2(String pageInfoObject, String
queryExpressionObject) throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getPage(this.serializer.deserializeObject(PageInfo.class,
(String) pageInfoObject),
this.serializer.deserializeObject(QueryExpression.class, (String)
queryExpressionObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String avrorpc_getPage3(String pageInfoObject, String
queryExpressionObject, String catalogIdsObject) throws AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getPage(this.serializer.deserializeObject(PageInfo.class,(String)
pageInfoObject),
this.serializer.deserializeObject(QueryExpression.class,(String)
queryExpressionObject), this.serializer.deserializeObject(Set.class,(String)
catalogIdsObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+
+ }
+
+ @Override
+ public String avrorpc_getMetadata(String pageObject) throws
AvroRemoteException {
+ try {
+ return
this.serializer.serializeObject(this.getMetadata(this.serializer.deserializeObject(Page.class,(String)
pageObject)));
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean avrorpc_removeCatalog(String catalogUrn) throws
AvroRemoteException {
+ try {
+ this.removeCatalog((String)catalogUrn);
+ } catch (Exception e) {
+ throw new AvroRemoteException(e.getMessage());
+ }
+ return true;}
+
+ @Override
+ public void startup() throws Exception {
+ avrorpc_startup();
+ }
+}
+
Added:
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelServerFactory.java
URL:
http://svn.apache.org/viewvc/oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelServerFactory.java?rev=1703325&view=auto
==============================================================================
---
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelServerFactory.java
(added)
+++
oodt/branches/avro_rpc/catalog/src/main/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/AvrorpcCommunicationChannelServerFactory.java
Wed Sep 16 03:37:00 2015
@@ -0,0 +1,52 @@
+package org.apache.oodt.cas.catalog.server.channel.avrorpc;
+
+
+//OODT imports
+import org.apache.oodt.cas.catalog.server.channel.CommunicationChannelServer;
+import
org.apache.oodt.cas.catalog.server.channel.CommunicationChannelServerFactory;
+import org.apache.oodt.cas.catalog.system.CatalogServiceFactory;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//FRAMEWORK imports
+import org.springframework.beans.factory.annotation.Required;
+
+public class AvrorpcCommunicationChannelServerFactory implements
CommunicationChannelServerFactory {
+
+ private static Logger LOG =
Logger.getLogger(AvrorpcCommunicationChannelServerFactory.class.getName());
+
+ protected int port;
+ protected CatalogServiceFactory catalogServiceFactory;
+
+ public AvrorpcCommunicationChannelServerFactory() {}
+
+ @Override
+ public CommunicationChannelServer createCommunicationChannelServer() {
+ try {
+ AvrorpcCommunicationChannelServer server = new
AvrorpcCommunicationChannelServer();
+
server.setCatalogService(this.catalogServiceFactory.createCatalogService());
+ server.setPort(this.port);
+ return server;
+ }catch (Exception e) {
+ LOG.log(Level.SEVERE, "Failed to create AVRO-RPC server : " +
e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Required
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public int getPort() {
+ return this.port;
+ }
+
+ @Required
+ public void setCatalogServiceFactory(CatalogServiceFactory
catalogServiceFactory) {
+ this.catalogServiceFactory = catalogServiceFactory;
+ }
+
+}
Added:
oodt/branches/avro_rpc/catalog/src/test/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/TestAvrorpcCommunicationChannelServer.java
URL:
http://svn.apache.org/viewvc/oodt/branches/avro_rpc/catalog/src/test/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/TestAvrorpcCommunicationChannelServer.java?rev=1703325&view=auto
==============================================================================
---
oodt/branches/avro_rpc/catalog/src/test/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/TestAvrorpcCommunicationChannelServer.java
(added)
+++
oodt/branches/avro_rpc/catalog/src/test/java/org/apache/oodt/cas/catalog/server/channel/avrorpc/TestAvrorpcCommunicationChannelServer.java
Wed Sep 16 03:37:00 2015
@@ -0,0 +1,145 @@
+package org.apache.oodt.cas.catalog.server.channel.avrorpc;
+
+import junit.framework.TestCase;
+import org.apache.commons.io.FileUtils;
+import org.apache.oodt.cas.catalog.mapping.InMemoryIngestMapperFactory;
+import
org.apache.oodt.cas.catalog.repository.MemoryBasedCatalogRepositoryFactory;
+import org.apache.oodt.cas.catalog.server.channel.CommunicationChannelClient;
+import org.apache.oodt.cas.catalog.server.channel.CommunicationChannelServer;
+import org.apache.oodt.cas.catalog.struct.impl.index.DataSourceIndexFactory;
+import org.apache.oodt.cas.catalog.struct.impl.index.InMemoryIndexFactory;
+import
org.apache.oodt.cas.catalog.struct.impl.transaction.UuidTransactionIdFactory;
+import org.apache.oodt.cas.catalog.system.CatalogFactory;
+import org.apache.oodt.cas.catalog.system.impl.CatalogServiceLocal;
+import org.apache.oodt.cas.catalog.system.impl.CatalogServiceLocalFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.SQLException;
+
+public class TestAvrorpcCommunicationChannelServer extends TestCase {
+
+ private CommunicationChannelServer server;
+ private CommunicationChannelClient client;
+
+ private CatalogServiceLocal cs;
+ private File testDir;
+
+ public void setUp() {
+ try {
+ //copied start
+ File tempFile = File.createTempFile("foo1", "bar2");
+ tempFile.deleteOnExit();
+ testDir = new File(tempFile.getParentFile(), "cas-catalog2");
+
+ CatalogServiceLocalFactory factory = new
CatalogServiceLocalFactory();
+ factory
+ .setCatalogRepositoryFactory(new
MemoryBasedCatalogRepositoryFactory());
+ factory.setIngestMapperFactory(this
+ .getOracleIngestMapperFactory(testDir.getAbsolutePath() +
"/mapper"));
+ factory.setOneCatalogFailsAllFail(true);
+ factory.setSimplifyQueries(true);
+ factory.setPluginStorageDir("/dev/null");
+ factory.setRestrictIngestPermissions(false);
+ factory.setRestrictQueryPermissions(false);
+ factory.setTransactionIdFactory(UuidTransactionIdFactory.class
+ .getCanonicalName());
+
+
+ cs = factory.createCatalogService();
+
+ CatalogFactory catalogFactory = new CatalogFactory();
+ catalogFactory.setCatalogId("TestCatalog1");
+ catalogFactory.setDictionaryFactories(null);
+ catalogFactory
+
.setIndexFactory(getInMemoryDSFactory(testDir.getAbsolutePath() + "/index/1/"));
+ catalogFactory.setRestrictIngestPermissions(false);
+ catalogFactory.setRestrictQueryPermissions(false);
+ cs.addCatalog(catalogFactory.createCatalog());
+ catalogFactory.setCatalogId("TestCatalog2");
+ catalogFactory
+
.setIndexFactory(getInMemoryDSFactory(testDir.getAbsolutePath() + "/index/2/"));
+ cs.addCatalog(catalogFactory.createCatalog());
+ //copied end
+
+
+ AvrorpcCommunicationChannelServerFactory serverFactory = new
AvrorpcCommunicationChannelServerFactory();
+ serverFactory.setCatalogServiceFactory(factory);
+
+ server = serverFactory.createCommunicationChannelServer();
+ server.setPort(9999);
+
+ server.setCatalogService(cs);
+ server.startup();
+
+ AvrorpcCommunicationChannelClientFactory clientFactory = new
AvrorpcCommunicationChannelClientFactory();
+ clientFactory.setChunkSize(200);
+ clientFactory.setConnectionTimeout(2000);
+ clientFactory.setRequestTimeout(2000);
+
+ clientFactory.setServerUrl("http://localhost:" + server.getPort());
+ client = clientFactory.createCommunicationChannelClient();
+
+ }catch (Exception e) {
+ e.printStackTrace();
+ TestCase.fail(e.getMessage());
+ }
+ }
+
+ public void tearDown() {
+ try {
+ FileUtils.forceDelete(this.testDir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ TestCase.fail(e.getMessage());
+ }
+ }
+
+ public void testServer(){
+ assertNotNull(server);
+ assertNotNull(client);
+ try {
+ assertNotNull(client.getCalalogProperties());
+
assertEquals(server.getCurrentCatalogIds(),client.getCurrentCatalogIds());
+
+ //System.out.println("x = " + client.getCurrentCatalogIds());
+ } catch (Exception e) {
+ e.printStackTrace();
+ assert(false);
+ }
+ }
+
+
+ private InMemoryIngestMapperFactory getOracleIngestMapperFactory(
+ String tmpDirPath) throws SQLException, IOException {
+ String user = "sa";
+ String pass = "";
+ String driver = "org.hsqldb.jdbcDriver";
+ String url = "jdbc:hsqldb:file:" + tmpDirPath + ";shutdown=true";
+
+ InMemoryIngestMapperFactory factory = new
InMemoryIngestMapperFactory();
+ factory.setDriver(driver);
+ factory.setJdbcUrl(url);
+ factory.setPass(pass);
+ factory.setUser(user);
+
factory.setTablesFile(this.getClass().getResource("/test-mapper-cat.sql").getPath());
+ return factory;
+ }
+
+ private DataSourceIndexFactory getInMemoryDSFactory(String tmpDirPath)
+ throws IOException, SQLException {
+ String user = "sa";
+ String pass = "";
+ String driver = "org.hsqldb.jdbcDriver";
+ String url = "jdbc:hsqldb:file:" + tmpDirPath + ";shutdown=true";
+
+ InMemoryIndexFactory indexFactory = new InMemoryIndexFactory();
+ indexFactory.setDriver(driver);
+ indexFactory.setJdbcUrl(url);
+ indexFactory.setPass(pass);
+ indexFactory.setUser(user);
+
indexFactory.setTablesFile(this.getClass().getResource("/test-index-cat.sql").getPath());
+ return indexFactory;
+ }
+
+}