Author: edwardyoon
Date: Thu May 14 01:48:38 2015
New Revision: 1679303
URL: http://svn.apache.org/r1679303
Log:
HAMA-956: Support force-setting the no. of tasks
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu May
14 01:48:38 2015
@@ -45,7 +45,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
@@ -58,11 +57,6 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.message.MessageManager;
-import org.apache.hama.bsp.message.OutgoingMessageManager;
-import org.apache.hama.bsp.message.OutgoingPOJOMessageBundle;
-import org.apache.hama.bsp.message.queue.MessageQueue;
-import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.ipc.JobSubmissionProtocol;
import org.apache.hama.ipc.RPC;
@@ -349,24 +343,32 @@ public class BSPJobClient extends Config
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
- InputSplit[] splits = job.getInputFormat().getSplits(job, (maxTasks >
configured) ? configured : maxTasks);
-
+ InputSplit[] splits = job.getInputFormat().getSplits(job,
+ (maxTasks > configured) ? configured : maxTasks);
+
if (maxTasks < splits.length) {
throw new IOException(
"Job failed! The number of splits has exceeded the number of max
tasks. The number of splits: "
+ splits.length + ", The number of max tasks: " + maxTasks);
}
-
- /*
- job = partition(job, splits, maxTasks);
- maxTasks = job.getInt("hama.partition.count", maxTasks);
- if (job.getBoolean("input.has.partitioned", false)) {
- splits = job.getInputFormat().getSplits(job, maxTasks);
+ /*
+ FIXME now graph job doesn't use this runtime input partitioning
+ Should we support this feature at BSP framework level?
+
+
if(job.getConfiguration().getBoolean(Constants.ENABLE_RUNTIME_PARTITIONING,
false)) {
+ job = partition(job, splits, maxTasks);
+ maxTasks = job.getInt("hama.partition.count", maxTasks);
}
*/
+
+ int numOfSplits = writeSplits(job, splits, submitSplitFile, maxTasks);
+ if (numOfSplits > configured
+ || !job.getConfiguration().getBoolean("hama.force.set.bsp.tasks",
+ false)) {
+ job.setNumBspTask(numOfSplits);
+ }
- job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks));
job.set("bsp.job.split.file", submitSplitFile.toString());
}
@@ -407,105 +409,6 @@ public class BSPJobClient extends Config
return launchJob(jobId, job, submitJobFile, fs);
}
- protected BSPJob partition(BSPJob job, InputSplit[] splits, int maxTasks)
- throws IOException {
- String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
-
- Path partitionDir = new Path("/tmp/hama-parts/" + job.getJobID() + "/");
- if (fs.exists(partitionDir)) {
- fs.delete(partitionDir, true);
- }
-
- if (job.get("bsp.partitioning.runner.job") != null) {
- return job;
- }// Early exit for the partitioner job.
-
- if (inputPath != null) {
- int numSplits = splits.length;
- int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
- if (LOG.isDebugEnabled()) {
- LOG.debug(" numTasks = "
- + numTasks
- + " numSplits = "
- + numSplits
- + " enable = "
- + (job.getConfiguration().getBoolean(
- Constants.ENABLE_RUNTIME_PARTITIONING, false)
- + " class = " + job.getConfiguration().get(
- Constants.RUNTIME_PARTITIONING_CLASS)));
- }
-
- if (numTasks == 0) {
- numTasks = numSplits;
- }
-
- if (job.getConfiguration().getBoolean(
- Constants.ENABLE_RUNTIME_PARTITIONING, false)
- && job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_CLASS)
!= null) {
-
- HamaConfiguration conf = new HamaConfiguration(job.getConfiguration());
-
- conf.setInt(Constants.RUNTIME_DESIRED_PEERS_COUNT, numTasks);
- if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) !=
null) {
- conf.set(Constants.RUNTIME_PARTITIONING_DIR, job.getConfiguration()
- .get(Constants.RUNTIME_PARTITIONING_DIR));
- }
-
- conf.set(Constants.RUNTIME_PARTITIONING_CLASS,
- job.get(Constants.RUNTIME_PARTITIONING_CLASS));
- BSPJob partitioningJob = new BSPJob(conf);
- partitioningJob.setJobName("Runtime partitioning job for "
- + partitioningJob.getJobName());
- LOG.debug("partitioningJob input: "
- + partitioningJob.get(Constants.JOB_INPUT_DIR));
-
-
partitioningJob.getConfiguration().setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
- OutgoingPOJOMessageBundle.class, OutgoingMessageManager.class);
-
partitioningJob.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
- SortedMemoryQueue.class, MessageQueue.class);
-
- partitioningJob.setInputFormat(job.getInputFormat().getClass());
- partitioningJob.setInputKeyClass(job.getInputKeyClass());
- partitioningJob.setInputValueClass(job.getInputValueClass());
-
- partitioningJob.setOutputFormat(SequenceFileOutputFormat.class);
- partitioningJob.setOutputKeyClass(job.getInputKeyClass());
- partitioningJob.setOutputValueClass(job.getInputValueClass());
-
- partitioningJob.setBspClass(PartitioningRunner.class);
- partitioningJob.setMessageClass(MapWritable.class);
- partitioningJob.set("bsp.partitioning.runner.job", "true");
- partitioningJob.getConfiguration().setBoolean(
- Constants.ENABLE_RUNTIME_PARTITIONING, false);
- partitioningJob.setOutputPath(partitionDir);
-
- boolean isPartitioned = false;
- try {
- isPartitioned = partitioningJob.waitForCompletion(true);
- } catch (InterruptedException e) {
- LOG.error("Interrupted partitioning run-time.", e);
- } catch (ClassNotFoundException e) {
- LOG.error("Class not found error partitioning run-time.", e);
- }
-
- if (isPartitioned) {
- if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR)
!= null) {
- job.setInputPath(new Path(conf
- .get(Constants.RUNTIME_PARTITIONING_DIR)));
- } else {
- job.setInputPath(partitionDir);
- }
- job.setBoolean("input.has.partitioned", true);
- job.setInputFormat(NonSplitSequenceFileInputFormat.class);
- } else {
- LOG.error("Error partitioning the input path.");
- throw new IOException("Runtime partition failed for the job.");
- }
- }
- }
- return job;
- }
-
protected RunningJob launchJob(BSPJobID jobId, BSPJob job,
Path submitJobFile, FileSystem fs) throws IOException {
//
@@ -569,17 +472,6 @@ public class BSPJobClient extends Config
DataOutputBuffer buffer = new DataOutputBuffer();
RawSplit rawSplit = new RawSplit();
for (InputSplit split : splits) {
-
- /*
- // set partitionID to rawSplit
- if (split.getClass().getName().equals(FileSplit.class.getName())) {
- LOG.debug(((FileSplit) split).getPath().getName());
- String[] extractPartitionID = ((FileSplit) split).getPath().getName()
- .split("[-]");
- rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
- }
- */
-
rawSplit.setClassName(split.getClass().getName());
buffer.reset();
split.write(buffer);
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu May
14 01:48:38 2015
@@ -630,11 +630,17 @@ public final class BSPPeerImpl<K1, V1, K
@Override
public final boolean readNext(K1 key, V1 value) throws IOException {
- return in.next(key, value);
+ if(in != null)
+ return in.next(key, value);
+ else
+ return false;
}
@Override
public final KeyValuePair<K1, V1> readNext() throws IOException {
+ if (split == null)
+ return null;
+
K1 k = in.createKey();
V1 v = in.createValue();
if (in.next(k, v)) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Thu
May 14 01:48:38 2015
@@ -262,15 +262,21 @@ public class JobInProgress {
} finally {
splitFile.close();
}
- numBSPTasks = splits.length;
- LOG.info("num BSPTasks: " + numBSPTasks);
+ LOG.debug("numBSPTasks: " + numBSPTasks + ", splits.length: "
+ + splits.length);
// adjust number of BSP tasks to actual number of splits
this.tasks = new TaskInProgress[numBSPTasks];
- for (int i = 0; i < numBSPTasks; i++) {
+ for (int i = 0; i < splits.length; i++) {
tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
splits[i], this.conf, this, i);
}
+
+ for (int i = splits.length; i < numBSPTasks; i++) {
+ tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
+ null, this.conf, this, i);
+ }
+
} else {
this.tasks = new TaskInProgress[numBSPTasks];
for (int i = 0; i < numBSPTasks; i++) {
@@ -329,7 +335,9 @@ public class JobInProgress {
}
/**
- * Gets the task for the first <code>TaskInProgress</code> that is not
already running
+ * Gets the task for the first <code>TaskInProgress</code> that is not
already
+ * running
+ *
* @param groomStatuses Map of statuses for available groom servers
* @return
*/
@@ -357,6 +365,7 @@ public class JobInProgress {
/**
* Creates a new task based on the provided <code>TaskInProgress</code>
+ *
* @param task The <code>TaskInProgress</code> for which to create a task
* @param groomStatuses The statuses of available groom servers
* @param resources Available resources of the given groom server
@@ -367,9 +376,8 @@ public class JobInProgress {
Task result = null;
String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
groomStatuses, taskCountInGroomMap, resources, task);
- GroomServerStatus groomStatus = taskAllocationStrategy
- .getGroomToAllocate(groomStatuses, selectedGrooms,
- taskCountInGroomMap, resources, task);
+ GroomServerStatus groomStatus = taskAllocationStrategy.getGroomToAllocate(
+ groomStatuses, selectedGrooms, taskCountInGroomMap, resources, task);
if (groomStatus != null) {
result = task.constructTask(groomStatus);
} else if (LOG.isDebugEnabled()) {
@@ -381,19 +389,20 @@ public class JobInProgress {
}
return result;
}
-
+
/**
* Get the ideal grooms on which to run a given task for the provided
* constraints
+ *
* @return
*/
- public String[] getPreferredGrooms(TaskInProgress task,
+ public String[] getPreferredGrooms(TaskInProgress task,
Map<String, GroomServerStatus> groomStatuses, BSPResource[] resources) {
- String[] grooms = taskAllocationStrategy.selectGrooms(
- groomStatuses, taskCountInGroomMap, resources, task);
+ String[] grooms = taskAllocationStrategy.selectGrooms(groomStatuses,
+ taskCountInGroomMap, resources, task);
return grooms;
}
-
+
public void recoverTasks(Map<String, GroomServerStatus> groomStatuses,
Map<GroomServerStatus, List<GroomServerAction>> actionMap)
throws IOException {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu
May 14 01:48:38 2015
@@ -241,7 +241,7 @@ public class LocalBSPRunner implements J
String splitname = null;
BytesWritable realBytes = null;
- if (splits != null) {
+ if (splits != null && splits.length > id) {
splitname = splits[id].getClassName();
realBytes = splits[id].getBytes();
}
@@ -349,9 +349,9 @@ public class LocalBSPRunner implements J
@Override
public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
throws IOException {
-
//peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
- // bundle.getLength());
-
+ //
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
+ // bundle.getLength());
+
MANAGER_MAP.get(addr).localQueueForNextIteration.addBundle(bundle);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
bundle.size());
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestKeyValueTextInputFormat.java
Thu May 14 01:48:38 2015
@@ -163,6 +163,8 @@ public class TestKeyValueTextInputFormat
job.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
job.setPartitioner(HashPartitioner.class);
+ // FIXME see 362 line at BSPJobClient.java
+ job.setNumBspTask(1);
job.setInputPath(dataPath);
job.setInputFormat(KeyValueTextInputFormat.class);
job.setInputKeyClass(Text.class);
@@ -173,10 +175,6 @@ public class TestKeyValueTextInputFormat
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
- BSPJobClient jobClient = new BSPJobClient(conf);
- ClusterStatus cluster = jobClient.getClusterStatus(true);
- job.setNumBspTask(cluster.getMaxTasks());
-
assertEquals(true, job.waitForCompletion(true));
} catch (Exception e) {
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
Thu May 14 01:48:38 2015
@@ -56,7 +56,7 @@ public class PageRankTest extends TestCa
generateTestData();
try {
PageRank.main(new String[] { "-input_path", INPUT, "-output_path",
- OUTPUT, "-task_num", "3", "-f", "json" });
+ OUTPUT, "-task_num", "5", "-f", "json" });
verifyResult();
} catch (ParseException e) {
e.printStackTrace();
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu May
14 01:48:38 2015
@@ -58,7 +58,8 @@ public class GraphJob extends BSPJob {
super(conf);
conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
OutgoingVertexMessageManager.class, OutgoingMessageManager.class);
-
+ conf.setBoolean("hama.bsp.force.set.tasks", true);
+
this.setBspClass(GraphJobRunner.class);
this.setJarByClass(exampleClass);
this.setVertexIDClass(Text.class);
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Thu May 14 01:48:38 2015
@@ -84,6 +84,8 @@ public class TestSubmitGraphJob extends
// set the defaults
bsp.setMaxIteration(30);
+ bsp.setNumBspTask(2);
+
bsp.setCompressionCodec(Bzip2Compressor.class);
bsp.setAggregatorClass(AverageAggregator.class);
Modified:
hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java?rev=1679303&r1=1679302&r2=1679303&view=diff
==============================================================================
---
hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java
(original)
+++
hama/trunk/ml/src/test/java/org/apache/hama/ml/recommendation/TestOnlineCF.java
Thu May 14 01:48:38 2015
@@ -31,41 +31,38 @@ import org.apache.hama.ml.recommendation
import org.apache.hama.ml.recommendation.cf.function.MeanAbsError;
import org.junit.Test;
-public class TestOnlineCF extends TestCase{
+public class TestOnlineCF extends TestCase {
@SuppressWarnings({ "deprecation", "rawtypes", "unchecked" })
@Test
public void testOnlineCF() {
- Preference[] train_prefs = {
- new Preference<Integer, Integer>(1, 1, 4),
- new Preference<Integer, Integer>(1, 2, 2.5),
- new Preference<Integer, Integer>(1, 3, 3.5),
- new Preference<Integer, Integer>(1, 4, 1),
- new Preference<Integer, Integer>(1, 5, 3.5),
- new Preference<Integer, Integer>(2, 1, 4),
- new Preference<Integer, Integer>(2, 2, 2.5),
- new Preference<Integer, Integer>(2, 3, 3.5),
- new Preference<Integer, Integer>(2, 4, 1),
- new Preference<Integer, Integer>(2, 5, 3.5),
- new Preference<Integer, Integer>(3, 1, 4),
- new Preference<Integer, Integer>(3, 2, 2.5),
- new Preference<Integer, Integer>(3, 3, 3.5)};
- Preference[] test_prefs = {
- new Preference<Integer, Integer>(1, 3, 3.5),
- new Preference<Integer, Integer>(2, 4, 1),
- new Preference<Integer, Integer>(3, 4, 1),
- new Preference<Integer, Integer>(3, 5, 3.5)
- };
-
+ Preference[] train_prefs = { new Preference<Integer, Integer>(1, 1, 4),
+ new Preference<Integer, Integer>(1, 2, 2.5),
+ new Preference<Integer, Integer>(1, 3, 3.5),
+ new Preference<Integer, Integer>(1, 4, 1),
+ new Preference<Integer, Integer>(1, 5, 3.5),
+ new Preference<Integer, Integer>(2, 1, 4),
+ new Preference<Integer, Integer>(2, 2, 2.5),
+ new Preference<Integer, Integer>(2, 3, 3.5),
+ new Preference<Integer, Integer>(2, 4, 1),
+ new Preference<Integer, Integer>(2, 5, 3.5),
+ new Preference<Integer, Integer>(3, 1, 4),
+ new Preference<Integer, Integer>(3, 2, 2.5),
+ new Preference<Integer, Integer>(3, 3, 3.5) };
+ Preference[] test_prefs = { new Preference<Integer, Integer>(1, 3, 3.5),
+ new Preference<Integer, Integer>(2, 4, 1),
+ new Preference<Integer, Integer>(3, 4, 1),
+ new Preference<Integer, Integer>(3, 5, 3.5) };
+
Random rnd = new Random();
Long num = Long.valueOf(rnd.nextInt(100000));
String fileName = "onlinecf_train" + num.toString();
String outputFileName = "onlinecf_model" + num.toString();
-
+
Configuration fsConf = new Configuration();
String strDataPath = "/tmp/" + fileName;
String convertedFileName = "/tmp/converted_" + fileName;
Path dataPath = new Path(strDataPath);
-
+
try {
URI uri = new URI(strDataPath);
FileSystem fs = FileSystem.get(uri, fsConf);
@@ -83,10 +80,11 @@ public class TestOnlineCF extends TestCa
}
fileOut.writeBytes(str.toString());
fileOut.close();
-
+
MovieLensConverter converter = new MovieLensConverter();
- assertEquals(true, converter.convert(strDataPath, null,
convertedFileName));
-
+ assertEquals(true,
+ converter.convert(strDataPath, null, convertedFileName));
+
OnlineCF recommender = new OnlineCF();
recommender.setInputPreferences(convertedFileName);
recommender.setIteration(150);
@@ -101,11 +99,12 @@ public class TestOnlineCF extends TestCa
int correct = 0;
for (Preference<Integer, Integer> test : test_prefs) {
double actual = test.getValue().get();
- double estimated = recommender.estimatePreference(test.getUserId(),
test.getItemId());
- correct += (Math.abs(actual-estimated)<0.5)?1:0;
+ double estimated = recommender.estimatePreference(test.getUserId(),
+ test.getItemId());
+ correct += (Math.abs(actual - estimated) < 0.5) ? 1 : 0;
}
- assertEquals(test_prefs.length*0.75, correct, 1);
+ assertEquals(test_prefs.length * 0.75, correct, 1);
fs.delete(new Path(outputFileName));
fs.delete(new Path(strDataPath));