Author: edwardyoon
Date: Wed Mar 19 06:58:33 2014
New Revision: 1579153
URL: http://svn.apache.org/r1579153
Log:
HAMA-893: Replace UTF8 in RPC class
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
hama/trunk/core/src/main/java/org/apache/hama/ipc/RPC.java
hama/trunk/core/src/main/java/org/apache/hama/util/ProgramDriver.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java?rev=1579153&r1=1579152&r2=1579153&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
Wed Mar 19 06:58:33 2014
@@ -59,6 +59,7 @@ public class SpilledDataInputBuffer exte
private long bytesWrittenInFile_;
private SpilledDataReadStatus status_;
private boolean closed_;
+ private RandomAccessFile raf;
/**
* Creates the thread to read the contents of the file and loads into the
@@ -85,7 +86,7 @@ public class SpilledDataInputBuffer exte
* @throws IOException
*/
private void keepReadingFromFile() throws IOException {
- RandomAccessFile raf = new RandomAccessFile(fileName, "r");
+ raf = new RandomAccessFile(fileName, "r");
FileChannel fc = raf.getChannel();
bytesToRead_ = fc.size();
bytesWrittenInFile_ = bytesToRead_;
@@ -116,6 +117,7 @@ public class SpilledDataInputBuffer exte
} while (!closed_ && bytesToRead_ > 0 && fileReadIndex >= 0
&& fileReadIndex < bufferList_.size());
fc.close();
+ raf.close();
closed_ = true;
status_.closedBySpiller();
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java?rev=1579153&r1=1579152&r2=1579153&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java
Wed Mar 19 06:58:33 2014
@@ -44,11 +44,10 @@ public class SyncReadByteBufferInputStre
public SyncReadByteBufferInputStream(boolean isSpilled, String fileName) {
spilled = isSpilled;
if (isSpilled) {
- RandomAccessFile f;
try {
- f = new RandomAccessFile(fileName, "r");
- fileChannel = f.getChannel();
+ fileChannel = new RandomAccessFile(fileName, "r").getChannel();
fileBytesToRead = fileChannel.size();
+
} catch (FileNotFoundException e) {
LOG.error("File not found initializing Synchronous Input Byte Stream",
e);
@@ -57,7 +56,6 @@ public class SyncReadByteBufferInputStre
LOG.error("Error initializing Synchronous Input Byte Stream", e);
throw new RuntimeException(e);
}
-
}
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java?rev=1579153&r1=1579152&r2=1579153&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
Wed Mar 19 06:58:33 2014
@@ -44,14 +44,12 @@ public class WriteSpilledDataProcessor i
}
private void initializeFileChannel() {
- FileOutputStream stream;
try {
- stream = new FileOutputStream(new File(fileName), true);
+ fileChannel = new FileOutputStream(new File(fileName),
true).getChannel();
} catch (FileNotFoundException e) {
LOG.error("Error opening file to write spilled data.", e);
throw new RuntimeException(e);
}
- fileChannel = stream.getChannel();
}
@Override
Modified: hama/trunk/core/src/main/java/org/apache/hama/ipc/RPC.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/RPC.java?rev=1579153&r1=1579152&r2=1579153&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/RPC.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/RPC.java Wed Mar 19
06:58:33 2014
@@ -39,7 +39,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
@@ -75,14 +75,15 @@ public class RPC {
} // no public ctor
/** A method invocation, including the method name and its parameters. */
+ @SuppressWarnings("rawtypes")
private static class Invocation implements Writable, Configurable {
private String methodName;
private Class[] parameterClasses;
private Object[] parameters;
private Configuration conf;
- public Invocation() {
- }
+ @SuppressWarnings("unused")
+ public Invocation() { }
public Invocation(Method method, Object[] parameters) {
this.methodName = method.getName();
@@ -106,7 +107,7 @@ public class RPC {
}
public void readFields(DataInput in) throws IOException {
- methodName = UTF8.readString(in);
+ methodName = Text.readString(in);
parameters = new Object[in.readInt()];
parameterClasses = new Class[parameters.length];
ObjectWritable objectWritable = new ObjectWritable();
@@ -118,7 +119,7 @@ public class RPC {
}
public void write(DataOutput out) throws IOException {
- UTF8.writeString(out, methodName);
+ Text.writeString(out, methodName);
out.writeInt(parameterClasses.length);
for (int i = 0; i < parameterClasses.length; i++) {
ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
@@ -256,6 +257,7 @@ public class RPC {
/**
* A version mismatch for the RPC protocol.
*/
+ @SuppressWarnings("serial")
public static class VersionMismatch extends IOException {
private String interfaceName;
private long clientVersion;
Modified: hama/trunk/core/src/main/java/org/apache/hama/util/ProgramDriver.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/ProgramDriver.java?rev=1579153&r1=1579152&r2=1579153&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/ProgramDriver.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/ProgramDriver.java Wed
Mar 19 06:58:33 2014
@@ -95,6 +95,7 @@ public class ProgramDriver {
* @throws NoSuchMethodException
* @throws SecurityException
*/
+ @SuppressWarnings("rawtypes")
public void addClass (String name, Class mainClass, String description)
throws Throwable {
programs.put(name , new ProgramDescription(mainClass, description));
}