This is an automated email from the ASF dual-hosted git repository.
abaker pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 67d2241 GEODE-4593 Remove unused MigrationClient/Server
67d2241 is described below
commit 67d2241be0ffcf1cd16bed48cfb6a74883db5dba
Author: Anthony Baker <[email protected]>
AuthorDate: Fri Jul 20 12:21:06 2018 -0700
GEODE-4593 Remove unused MigrationClient/Server
Removing unused, obsolete, internal classes. MigrationClient/Server
were used a long time ago to transfer data between clusters using
different serialization formats. This code is no longer necessary.
---
.../org/apache/geode/internal/MigrationClient.java | 261 ----------
.../org/apache/geode/internal/MigrationServer.java | 559 ---------------------
2 files changed, 820 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/MigrationClient.java
b/geode-core/src/main/java/org/apache/geode/internal/MigrationClient.java
deleted file mode 100755
index b181eac..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/MigrationClient.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.apache.geode.internal;
-
-import static
org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.util.Properties;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.net.SocketCreator;
-
-/**
- * MigrationClient is used to retrieve all of the data for a region from a
MigrationServer. First
- * start a MigrationServer using one version of GemFire, then connect to it
using a MigrationClient
- * with another version of GemFire.
- * <p>
- * Command line arguments are<br>
- * region name (required)<br>
- * cache-xml-file-name (required)<br>
- * server port (defaults to 10553)<br>
- * server address (defaults to local host)
- * <p>
- * The region should be defined in the cache-xml file, and must also be
defined in the server's
- * cache-xml file.
- * <p>
- * Typically, the cache-xml file will be exactly the same as the one used by
the MigrationServer
- * with different disk-dirs settings. When Region entries are transferred from
the server to the
- * client, they are then stored in new files in these directories.
- *
- * @since GemFire 6.0.1
- */
-public class MigrationClient {
- private static final boolean VERBOSE = MigrationServer.VERBOSE;
-
- // version for backward communications compatibility
- private static final int VERSION = 551;
-
- private static final int CODE_ERROR = MigrationServer.CODE_ERROR;
-
- /* serialized key, serialized value */
- private static final int CODE_ENTRY = MigrationServer.CODE_ENTRY;
-
- private static final int CODE_COMPLETED = MigrationServer.CODE_COMPLETED;
-
- public static void main(String[] args) throws Exception {
- int argIdx = 0;
- String cacheXmlFileName;
- String regionName;
-
- if (args.length > argIdx + 1) {
- regionName = args[argIdx++];
- cacheXmlFileName = args[argIdx++];
- } else {
- System.err
- .println("MigrationClient regionName [cache-xml-file] [server-port]
[server-address]");
- return;
- }
- int serverPort = 10533;
- if (args.length > argIdx) {
- serverPort = Integer.parseInt(args[argIdx++]);
- }
- String bindAddressName = null;
- if (args.length > argIdx) {
- bindAddressName = args[argIdx++];
- }
-
- MigrationClient instance = null;
- try {
- instance = new MigrationClient(cacheXmlFileName, bindAddressName,
serverPort);
- } catch (IllegalArgumentException e) {
- System.err.println(e.getMessage());
- ExitCode.FATAL.doSystemExit();
- }
- instance.createDistributedSystem();
- instance.createCache();
- instance.getRegion(regionName);
- }
-
- private final InetAddress serverAddress;
- private final int port;
- private DistributedSystem distributedSystem;
- private File cacheXmlFile;
- private Cache cache;
- private Socket server;
- private int serverVersion;
- private DataInputStream dis;
- private DataOutputStream dos;
-
- /**
- * Create a MigrationClient to be used with a DistributedSystem and Cache
that are created using
- * GemFire APIs
- *
- * @param bindAddressName the server's address
- * @param serverPort the server's port
- */
- private MigrationClient(String bindAddressName, int serverPort) {
- this.port = serverPort;
- try {
- this.serverAddress = InetAddress.getByName(bindAddressName);
- } catch (IOException ignore) {
- throw new IllegalArgumentException(
- "Error - bind address cannot be resolved: '" + bindAddressName +
'\'');
- }
- }
-
- /**
- * this is for use by main()
- *
- * @param cacheXmlFileName the name of the xml file describing the cache, or
null
- * @param bindAddressName the name of the NIC to bind to, or null
- * @param serverPort the port to connect to (must not be zero)
- */
- private MigrationClient(String cacheXmlFileName, String bindAddressName, int
serverPort) {
- this(bindAddressName, serverPort);
- this.cacheXmlFile = new File(cacheXmlFileName);
- if (!this.cacheXmlFile.exists()) {
- // in 6.x this should be localizable
- System.err
- .println("Warning - file not found in local directory: '" +
cacheXmlFileName + '\'');
- }
- }
-
- /**
- * Create a distributed system. If this method is not invoked before running
the MigrationServer,
- * an existing distributed system must exist for the server to use.
- *
- * @throws Exception if there are any problems
- */
- private void createDistributedSystem() throws Exception {
- Properties dsProps = new Properties();
- // if no discovery information has been explicitly given, use a loner ds
- if (System.getProperty(DistributionConfig.GEMFIRE_PREFIX + MCAST_PORT) ==
null
- && System.getProperty(DistributionConfig.GEMFIRE_PREFIX + LOCATORS) ==
null) {
- dsProps.put(MCAST_PORT, "0");
- }
- dsProps.put(LOG_FILE, "migrationClient.log");
- if (this.cacheXmlFile != null) {
- dsProps.put(CACHE_XML_FILE, this.cacheXmlFile.getName());
- }
- this.distributedSystem = DistributedSystem.connect(dsProps);
- }
-
- /**
- * create the cache to be used by this migration server
- *
- * @throws Exception if there are any problems
- */
- private void createCache() throws Exception {
- if (this.distributedSystem == null) {
- this.distributedSystem =
InternalDistributedSystem.getConnectedInstance();
- }
- this.cache = CacheFactory.create(this.distributedSystem);
- }
-
- private void initDSAndCache() {
- if (this.distributedSystem == null) {
- this.distributedSystem =
InternalDistributedSystem.getConnectedInstance();
- }
- if (this.cache == null) {
- this.cache = GemFireCacheImpl.getInstance();
- }
- }
-
- public Region getRegion(String regionName) throws IOException,
ClassNotFoundException {
- initDSAndCache();
- Region region = this.cache.getRegion(regionName);
- try {
- connectToServer();
- if (this.serverVersion != VERSION) {
- System.out.println("Don't know how to deal with version " +
this.serverVersion);
- throw new IOException("Server has incompatible version of
MigrationServer");
- }
-
- this.dos.writeShort(MigrationServer.ClientRequest.REGION_REQUEST);
- this.dos.writeUTF(regionName);
- this.dos.flush();
-
- boolean done = false;
- while (!done) {
- int responseCode = -1;
- try {
- responseCode = this.dis.readShort();
- } catch (EOFException ignore) {
- }
- switch (responseCode) {
- case -1:
- throw new IOException("Server socket was closed while receiving
entries");
- case CODE_COMPLETED:
- done = true;
- break;
- case CODE_ERROR:
- String errorString = this.dis.readUTF();
- System.err.println("Server responded with error: '" + errorString
+ '\'');
- throw new IOException(errorString);
- case CODE_ENTRY:
- Object key = new
ObjectInputStream(this.server.getInputStream()).readObject();
- Object value = new
ObjectInputStream(this.server.getInputStream()).readObject();
- if (VERBOSE) {
- System.out.println("received " + key);
- }
- region.put(key, value);
- break;
- }
- }
- } finally {
- if (this.server != null && !this.server.isClosed()) {
- this.server.close();
- }
- }
- return region;
- }
-
- private void connectToServer() throws IOException {
- this.server = new Socket();
- SocketAddress addr;
- if (this.serverAddress != null) {
- addr = new InetSocketAddress(this.serverAddress, this.port);
- } else {
- addr = new InetSocketAddress(SocketCreator.getLocalHost(), this.port);
- }
- if (VERBOSE) {
- System.out.println("connecting to " + addr);
- }
- this.server.connect(addr);
- this.dos = new DataOutputStream(this.server.getOutputStream());
- this.dos.writeShort(VERSION);
- this.dis = new DataInputStream(this.server.getInputStream());
- this.serverVersion = this.dis.readShort();
- }
-}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/MigrationServer.java
b/geode-core/src/main/java/org/apache/geode/internal/MigrationServer.java
deleted file mode 100755
index 35df445..0000000
--- a/geode-core/src/main/java/org/apache/geode/internal/MigrationServer.java
+++ /dev/null
@@ -1,559 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
- * agreements. See the NOTICE file distributed with this work for additional
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
- * or implied. See the License for the specific language governing permissions
and limitations under
- * the License.
- */
-package org.apache.geode.internal;
-
-import static
org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.NetworkInterface;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.Enumeration;
-import java.util.Properties;
-
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.admin.internal.InetAddressUtil;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.Region.Entry;
-import org.apache.geode.distributed.DistributedSystem;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * MigrationServer creates a cache using a supplied cache.xml and then opens a
server socket that a
- * MigrationClient connects to and requests the data from a Region.
MigrationServer sends the data
- * to the MigrationClient using normal java serialization in order to allow
migration from
- * incompatible versions of DataSerializer. Keys and values stored in the
cache must serialize and
- * deserialize correctly.
- * <p>
- * Command line arguments are<br>
- * cache-xml-file-name (required)<br>
- * listen port (defaults to 10553)<br>
- * bind address (defaults to listing on all interfaces)<br>
- * <p>
- * Both the MigrationClient and MigrationServer must be configured to have the
appropriate domain
- * classes in their CLASSPATH, or errors will be encountered during
deserialization.
- * <P>
- * Details of the transfers can be viewed by setting the system property
Migration.VERBOSE=true.
- * <p>
- * For example,
- *
- * <pre>
- * java -cp $MYCLASSES:migration.jar:$GEMFIRE/lib/geode-dependencies.jar \
- * org.apache.geode.internal.MigrationServer cacheDescription.xml
- * </pre>
- * <p>
- * Where the cacheDescription.xml file might look like this:
- *
- * <pre>
- * <!DOCTYPE cache PUBLIC
- "-//GemStone Systems, Inc.//GemFire Declarative Caching 5.7//EN"
- "http://www.gemstone.com/dtd/cache5_7.dtd">
-<cache is-server="false">
- <region name="root">
- <region-attributes scope="distributed-no-ack">
- </region-attributes>
-
- <region name="Test">
- <region-attributes data-policy="persistent-replicate">
-
- <disk-write-attributes>
- <synchronous-writes/>
- </disk-write-attributes>
-
- <disk-dirs>
- <disk-dir>diskfiles</disk-dir>
- </disk-dirs>
-
- <eviction-attributes>
- <lru-memory-size maximum="100" action="overflow-to-disk"/>
- </eviction-attributes>
-
- </region-attributes>
- </region> <!-- Test region -->
- </region> <!-- root region -->
-</cache>
- *
- * </pre>
- * <p>
- * The client is then run with a different cache description having different
disk-dirs to hold the
- * migrated information.
- *
- * @since GemFire 6.0.1
- */
-public class MigrationServer {
- private static final Logger logger = LogService.getLogger();
-
- static final boolean VERBOSE = Boolean.getBoolean("Migration.VERBOSE");
-
- // version for backward communications compatibility
- private static final int VERSION = 551;
-
- static final int CODE_ERROR = 0;
-
- /* serialized key, serialized value */
- static final int CODE_ENTRY = 1;
-
- static final int CODE_COMPLETED = 2;
-
- public static void main(String[] args) throws Exception {
- int argIdx = 0;
- String cacheXmlFileName = "cache.xml";
-
- if (args.length > 0) {
- cacheXmlFileName = args[argIdx++];
- } else {
- System.err.println("MigrationServer cache-xml-file [server-address]
[server-port]");
- }
- int listenPort = 10533;
- if (args.length > argIdx) {
- listenPort = Integer.parseInt(args[argIdx++]);
- }
- String bindAddressName = null;
- if (args.length > argIdx) {
- bindAddressName = args[argIdx++];
- }
-
- MigrationServer instance = null;
- try {
- instance = new MigrationServer(cacheXmlFileName, bindAddressName,
listenPort);
- } catch (IllegalArgumentException e) {
- System.err.println(e.getMessage());
- ExitCode.FATAL.doSystemExit();
- }
- instance.createDistributedSystem();
- instance.createCache();
- instance.serve();
- }
-
- private InetAddress bindAddress;
- private final int listenPort;
- private ServerSocket serverSocket;
- private DistributedSystem distributedSystem;
- private File cacheXmlFile;
- private Cache cache;
-
- /**
- * Create a MigrationServer to be used with a DistributedSystem and Cache
that are created using
- * GemFire APIs
- *
- * @param bindAddressName the NIC to bind to, or null to use all interfaces
- * @param listenPort the port to listen on
- */
- private MigrationServer(String bindAddressName, int listenPort) {
- this.listenPort = listenPort;
- if (bindAddressName != null) {
- if (!isLocalHost(bindAddressName)) {
- throw new IllegalArgumentException(
- "Error - bind address is not an address of this machine: '" +
bindAddressName + '\'');
- }
- try {
- this.bindAddress = InetAddress.getByName(bindAddressName);
- } catch (IOException ignore) {
- throw new IllegalArgumentException(
- "Error - bind address cannot be resolved: '" + bindAddressName +
'\'');
- }
- }
- try {
- if (this.bindAddress != null) {
- this.serverSocket = new ServerSocket();
- SocketAddress addr = new InetSocketAddress(this.bindAddress,
listenPort);
- this.serverSocket.bind(addr);
- } else {
- this.serverSocket = new ServerSocket(listenPort);
- }
- if (VERBOSE) {
- System.out.println("created server socket " + this.serverSocket);
- }
- } catch (IOException e) {
- throw new IllegalArgumentException("Port is already in use", e);
- }
- }
-
- /**
- * this is for use by main()
- *
- * @param cacheXmlFileName the name of the xml file describing the cache, or
null
- * @param bindAddressName the name of the NIC to bind to, or null
- * @param listenPort the port to listen on (must not be zero)
- */
- private MigrationServer(String cacheXmlFileName, String bindAddressName, int
listenPort) {
- this(bindAddressName, listenPort);
- this.cacheXmlFile = new File(cacheXmlFileName);
- if (!this.cacheXmlFile.exists()) {
- // in 6.x this should be localizable
- System.err
- .println("Warning - file not found in local directory: '" +
cacheXmlFileName + '\'');
- }
- }
-
- /**
- * Create a distributed system. If this method is not invoked before running
the MigrationServer,
- * an existing distributed system must exist for the server to use.
- *
- * @throws Exception if there are any problems
- */
- private void createDistributedSystem() throws Exception {
- Properties dsProps = new Properties();
- // if no discovery information has been explicitly given, use a loner ds
- if (System.getProperty(DistributionConfig.GEMFIRE_PREFIX + MCAST_PORT) ==
null
- && System.getProperty(DistributionConfig.GEMFIRE_PREFIX + LOCATORS) ==
null) {
- dsProps.put(MCAST_PORT, "0");
- }
- dsProps.put(LOG_FILE, "migrationServer.log");
- if (this.cacheXmlFile != null) {
- dsProps.put(CACHE_XML_FILE, this.cacheXmlFile.getName());
- }
- this.distributedSystem = DistributedSystem.connect(dsProps);
- if (VERBOSE) {
- System.out.println("created distributed system " +
this.distributedSystem);
- }
- }
-
- /**
- * create the cache to be used by this migration server
- *
- * @throws Exception if there are any problems
- */
- private void createCache() throws Exception {
- if (this.distributedSystem == null) {
- this.distributedSystem =
InternalDistributedSystem.getConnectedInstance();
- }
- this.cache = CacheFactory.create(this.distributedSystem);
- if (VERBOSE) {
- System.out.println("created cache " + this.cache);
- }
- }
-
- /**
- * This locates the distributed system and cache, if they have not been
created by this server,
- * and then listens for requests from MigrationClient processes.
- *
- * @throws IllegalStateException if an attempt is made to reuse a server
that has been stopped
- */
- public void serve() throws Exception {
- if (this.serverSocket == null || this.serverSocket.isClosed()) {
- throw new IllegalStateException("This server has been closed and cannot
be reused");
- }
- try {
- if (this.distributedSystem == null) {
- this.distributedSystem =
InternalDistributedSystem.getConnectedInstance();
- }
- if (this.cache == null) {
- this.cache = GemFireCacheImpl.getInstance();
- }
- if (this.bindAddress != null) {
- System.out.println("Migration server on port " + this.listenPort + "
bound to "
- + this.bindAddress + " is ready for client requets");
- } else {
- System.out.println(
- "Migration server on port " + this.listenPort + " is ready for
client requests");
- }
- for (;;) {
- if (Thread.interrupted() || this.serverSocket.isClosed()) {
- return;
- }
- Socket clientSocket;
- try {
- clientSocket = this.serverSocket.accept();
- } catch (SocketException ignored) {
- return;
- }
- new RequestHandler(clientSocket).serveClientRequest();
- }
- } finally {
- System.out.println("Closing migration server");
- try {
- this.serverSocket.close();
- } catch (Exception ignore) {
- this.serverSocket = null;
- }
- }
- }
-
- /**
- * this causes the migration server to stop serving after it finishes
dispatching any in-process
- * requests
- *
- * @throws IOException if there is a problem closing the server socket
- */
- public void stop() throws IOException {
- if (this.serverSocket != null && !this.serverSocket.isClosed()) {
- this.serverSocket.close();
- }
- }
-
- /**
- * get the cache being used by this migration server
- *
- * @return the cache, or null if a cache has not yet been associated with
this server
- */
- public Cache getCache() {
- return this.cache;
- }
-
- /**
- * get the distributed system being used by this migration server
- *
- * @return the distributed system, or null if a system has not yet been
associated with this
- * server
- */
- public DistributedSystem getDistributedSystem() {
- return this.distributedSystem;
- }
-
- // copied from 6.0 SocketCreator
- public static boolean isLocalHost(Object host) {
- if (host instanceof InetAddress) {
- if (InetAddressUtil.LOCALHOST.equals(host)) {
- return true;
- } else {
- try {
- Enumeration<NetworkInterface> en =
NetworkInterface.getNetworkInterfaces();
- while (en.hasMoreElements()) {
- NetworkInterface i = en.nextElement();
- for (Enumeration<InetAddress> en2 = i.getInetAddresses();
en2.hasMoreElements();) {
- InetAddress addr = en2.nextElement();
- if (host.equals(addr)) {
- return true;
- }
- }
- }
- return false;
- } catch (SocketException e) {
- throw new IllegalArgumentException(
-
LocalizedStrings.InetAddressUtil_UNABLE_TO_QUERY_NETWORK_INTERFACE
- .toLocalizedString(),
- e);
- }
- }
- } else {
- return isLocalHost(toInetAddress(host.toString()));
- }
- }
-
- // copied from 6.0 SocketCreator
- public static InetAddress toInetAddress(String host) {
- if (host == null || host.length() == 0) {
- return null;
- }
- try {
- if (host.contains("/")) {
- return InetAddress.getByName(host.substring(host.indexOf('/') + 1));
- } else {
- return InetAddress.getByName(host);
- }
- } catch (UnknownHostException e) {
- throw new IllegalArgumentException(e.getMessage());
- }
- }
-
- class RequestHandler implements Runnable {
- Socket clientSocket;
- DataInputStream dis;
- DataOutputStream dos;
-
- RequestHandler(Socket clientSocket) throws IOException {
- this.clientSocket = clientSocket;
- this.dos = new DataOutputStream(this.clientSocket.getOutputStream());
- this.dis = new DataInputStream(this.clientSocket.getInputStream());
- }
-
- // for now this is a blocking operation - multithread later if necessary
- void serveClientRequest() {
- try {
- run();
- } finally {
- if (!this.clientSocket.isClosed()) {
- try {
- this.clientSocket.close();
- } catch (IOException e) {
- logger.debug(e);
- }
- }
- }
- }
-
- @Override
- public void run() {
- try {
- // first exchange version information so we can communicate correctly
- this.dos.writeShort(VERSION);
- int version = this.dis.readShort();
- handleRequest(version);
- } catch (IOException e) {
- System.err.println("Trouble dispatching request: " + e.getMessage());
- } finally {
- try {
- this.clientSocket.close();
- } catch (IOException e) {
- logger.debug("Trouble closing client socket", e);
- }
- }
- }
-
- /**
- * read and dispatch a single request on client socket
- */
- private void handleRequest(int clientVersion) {
- // for now we ignore the client version in the server. The client
- // is typically of a later release than the server, and this information
- // is given to the server in case a situation arises where it's needed
- try {
- ClientRequest req = ClientRequest.readRequest(this.clientSocket,
this.dis, this.dos);
- if (req != null) {
- System.out.println(
- "Processing " + req + " from " +
this.clientSocket.getInetAddress().getHostAddress());
- req.process(MigrationServer.this);
- this.dos.flush();
- }
- } catch (IOException e) {
- logger.debug(e);
- }
- }
- }
-
- abstract static class ClientRequest {
- Socket clientSocket;
- DataInputStream dsi;
- DataOutputStream dso;
-
- static final int REGION_REQUEST = 1;
-
- /**
- * Use readRequest to create a new request object, not this constructor.
Subclasses may refine
- * this constructor to perform other initialization
- *
- * @param dsi socket's input stream
- * @param dso socket's output stream
- * @throws IOException if there are any problems reading initialization
information
- */
- ClientRequest(Socket clientSocket, DataInputStream dsi, DataOutputStream
dso)
- throws IOException {
- this.clientSocket = clientSocket;
- this.dsi = dsi;
- this.dso = dso;
- }
-
- /**
- * Read and return a request from a client
- *
- * @param dsi socket input stream
- * @param dso socket output stream
- * @return the new request
- */
- static ClientRequest readRequest(Socket clientSocket, DataInputStream dsi,
DataOutputStream dso)
- throws IOException {
- int requestType = dsi.readShort();
- switch (requestType) {
- case REGION_REQUEST:
- return new RegionRequest(clientSocket, dsi, dso);
- }
- dso.writeShort(CODE_ERROR);
- String errorMessage = "Type of request is not implemented in this
server";
- dso.writeUTF(errorMessage);
- System.err.println("Migration server received unknown type of request ("
+ requestType
- + ") from " + clientSocket.getInetAddress().getHostAddress());
- return null;
- }
-
- void writeErrorResponse(String message) throws IOException {
- this.dso.writeShort(CODE_ERROR);
- this.dso.writeUTF(message);
- }
-
- abstract void process(MigrationServer server) throws IOException;
- }
-
- /**
- * RegionRequest represents a request for the keys and values of a Region
from a client.
- */
- static class RegionRequest extends ClientRequest {
- String regionName;
-
- RegionRequest(Socket clientSocket, DataInputStream dsi, DataOutputStream
dso)
- throws IOException {
- super(clientSocket, dsi, dso);
- this.regionName = dsi.readUTF();
- }
-
- @Override
- public String toString() {
- return "request for contents of region '" + this.regionName + '\'';
- }
-
- @Override
- void process(MigrationServer server) throws IOException {
- Cache cache = server.getCache();
- Region region = null;
- try {
- region = cache.getRegion(this.regionName);
- if (region == null) {
- String errorMessage = "Error: region " + this.regionName + " not
found in cache";
- System.err.println(errorMessage);
- writeErrorResponse(errorMessage);
- }
- } catch (IllegalArgumentException e) {
- String errorMessage = "Error: malformed region name";
- logger.warn(errorMessage, e);
- writeErrorResponse(errorMessage);
- }
- try {
- for (Object o : region.entrySet()) {
- sendEntry((Entry) o);
- }
- this.dso.writeShort(CODE_COMPLETED);
- } catch (Exception e) {
- writeErrorResponse(e.getMessage());
- }
- }
-
- private void sendEntry(Region.Entry entry) throws IOException {
- Object key = entry.getKey();
- Object value = entry.getValue();
- if (!(key instanceof Serializable)) {
- throw new IOException("Could not serialize entry for '" + key + '\'');
- }
- if (!(value instanceof Serializable)) {
- throw new IOException("Could not serialize entry for '" + key + '\'');
- }
- if (VERBOSE) {
- System.out.println("Sending " + key);
- }
- this.dso.writeShort(CODE_ENTRY);
- new ObjectOutputStream(clientSocket.getOutputStream()).writeObject(key);
- new
ObjectOutputStream(clientSocket.getOutputStream()).writeObject(value);
- }
- }
-}