Author: edwardyoon
Date: Thu Apr 9 12:15:13 2015
New Revision: 1672307
URL: http://svn.apache.org/r1672307
Log:
Fix YarnSerializePrinting and pom file for jdk8
Modified:
hama/trunk/pom.xml
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
Modified: hama/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1672307&r1=1672306&r2=1672307&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Thu Apr 9 12:15:13 2015
@@ -126,6 +126,24 @@
<profiles>
<profile>
+ <id>doclint-java8-disable</id>
+ <activation>
+ <jdk>[1.8,)</jdk>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <additionalparam>-Xdoclint:none</additionalparam>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <profile>
<id>hadoop1</id>
<modules>
@@ -476,7 +494,6 @@
<configuration>
<aggregate>true</aggregate>
<outputDirectory>docs/apidocs</outputDirectory>
- <additionalparam>-Xdoclint:none</additionalparam>
</configuration>
<executions>
<execution>
Modified:
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java?rev=1672307&r1=1672306&r2=1672307&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
(original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java Thu
Apr 9 12:15:13 2015
@@ -424,7 +424,6 @@ public class YARNBSPJobClient extends BS
return files;
}
-
private void addToLocalResources(FileSystem fs, String fileSrcPath,
String fileDstPath, String fileName, Map<String, LocalResource>
localResources)
throws IOException {
Modified:
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java?rev=1672307&r1=1672306&r2=1672307&view=diff
==============================================================================
---
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
(original)
+++
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YarnSerializePrinting.java
Thu Apr 9 12:15:13 2015
@@ -21,39 +21,62 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hama.Constants;
+import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
public class YarnSerializePrinting {
+ public static Path OUTPUT_PATH = new Path("/tmp/serialout");
+
public static class HelloBSP extends
- BSP<NullWritable, NullWritable, NullWritable, NullWritable,
NullWritable> {
+ BSP<NullWritable, NullWritable, IntWritable, Text, NullWritable> {
public static final Log LOG = LogFactory.getLog(HelloBSP.class);
- private final static int PRINT_INTERVAL = 1000;
private int num;
@Override
public void bsp(
- BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable,
NullWritable> bspPeer)
+ BSPPeer<NullWritable, NullWritable, IntWritable, Text, NullWritable>
bspPeer)
throws IOException, SyncException, InterruptedException {
num = bspPeer.getConfiguration().getInt("bsp.peers.num", 1);
- LOG.info(bspPeer.getAllPeerNames());
+ IntWritable peerNum = new IntWritable();
+ Text txt = new Text();
int i = 0;
for (String otherPeer : bspPeer.getAllPeerNames()) {
String peerName = bspPeer.getPeerName();
if (peerName.equals(otherPeer)) {
- LOG.info("Hello BSP from " + (i + 1) + " of " + num + ": " +
peerName);
+ peerNum.set(i);
+ txt.set("Hello BSP from " + (i + 1) + " of " + num + ": " +
peerName);
+ bspPeer.write(null, txt);
}
- Thread.sleep(PRINT_INTERVAL);
bspPeer.sync();
i++;
}
}
}
+ static void printOutput(HamaConfiguration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] files = fs.listStatus(OUTPUT_PATH);
+ for (FileStatus file : files) {
+ if (file.getLen() > 0) {
+ FSDataInputStream in = fs.open(file.getPath());
+ IOUtils.copyBytes(in, System.out, conf, false);
+ in.close();
+ }
+ }
+
+ //fs.delete(OUTPUT_PATH, true);
+ }
+
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
HamaConfiguration conf = new HamaConfiguration();
@@ -63,9 +86,17 @@ public class YarnSerializePrinting {
job.setJarByClass(HelloBSP.class);
job.setJobName("Serialize Printing");
job.setInputFormat(NullInputFormat.class);
- job.setOutputFormat(NullOutputFormat.class);
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputKeyClass(IntWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setOutputPath(OUTPUT_PATH);
job.setMemoryUsedPerTaskInMb(100);
job.setNumBspTask(4);
+
+ long startTime = System.currentTimeMillis();
job.waitForCompletion(true);
+ printOutput(conf);
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}