Author: aching
Date: Thu Sep  8 18:47:19 2011
New Revision: 1166854

URL: http://svn.apache.org/viewvc?rev=1166854&view=rev
Log:
GIRAPH-25 NPE in BspServiceMaster when failing a job (committed by
aching on behalf of dvryaboy).


Added:
    
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1166854&r1=1166853&r2=1166854&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Thu Sep  8 18:47:19 2011
@@ -1,25 +1,28 @@
 Giraph Change Log
 
 Release 0.70.0 - unreleased
+ 
+  GIRAPH-25 NPE in BspServiceMaster when failing a job (aching on behalf 
+  of dvryaboy)
 
-  GIRAPH-24. Job-level statistics reports one superstep greater than 
+  GIRAPH-24 Job-level statistics reports one superstep greater than 
   workers. (jghoman)
   
-  GIRAPH-18. Refactor BspServiceWorker::loadVertices(). (jghoman)
+  GIRAPH-18 Refactor BspServiceWorker::loadVertices(). (jghoman)
   
-  GIRAPH-14. Support for the Facebook Hadoop branch. (aching)
+  GIRAPH-14 Support for the Facebook Hadoop branch. (aching)
 
-  GIRAPH-16. Add Apache RAT to the verify build step. (omalley)
+  GIRAPH-16 Add Apache RAT to the verify build step. (omalley)
 
-  GIRAPH-17 - Giraph doesn't give up properly after the maximum connect
+  GIRAPH-17 Giraph doesn't give up properly after the maximum connect
   attempts to ZooKeeper. (aching)
 
-  GIRAPH-2: Make the project homepage. (jghoman)
+  GIRAPH-2 Make the project homepage. (jghoman)
 
-  GIRAPH-9: Change Yahoo License Header to Apache License Header (hyunsik)
+  GIRAPH-9 Change Yahoo License Header to Apache License Header (hyunsik)
 
-  GIRAPH-6. Remove Yahoo-specific code from pom.xml. (jghoman)
+  GIRAPH-6 Remove Yahoo-specific code from pom.xml. (jghoman)
 
   GIRAPH-5 Remove Yahoo directories after svn import from Yahoo! (aching)
 
-  GIRAPH-3. Vertex:sentMsgToAllEdges should be sendMsg. (jghoman)
\ No newline at end of file
+  GIRAPH-3 Vertex:sentMsgToAllEdges should be sendMsg. (jghoman)
\ No newline at end of file

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java?rev=1166854&r1=1166853&r2=1166854&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
 Thu Sep  8 18:47:19 2011
@@ -46,7 +46,7 @@ public interface CentralizedServiceMaste
      * VertexInputFormat.  These InputSplits will be split further into
      * partitions by the workers.
      *
-     * @return number of partitions
+     * @return number of partitions. Returns -1 on failure to create valid 
input splits.
      */
     int createInputSplits();
 

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1166854&r1=1166853&r2=1166854&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
 Thu Sep  8 18:47:19 2011
@@ -408,15 +408,18 @@ public class BspServiceMaster<
             ++pollAttempt;
         }
         if (failJob) {
-            LOG.warn("checkWorkers: Did not receive enough processes in " +
-                     "time (only " + totalResponses + " of " +
-                     minWorkers + " required)");
+            LOG.error("checkWorkers: Did not receive enough processes in " +
+                      "time (only " + totalResponses + " of " +
+                      minWorkers + " required).  This occurs if you do not " +
+                      "have enough map tasks available simultaneously on " +
+                      "your Hadoop instance to fulfill the number of " +
+                      "requested workers.");
             return null;
         }
 
         if (healthyWorkerList.size() < minWorkers) {
-            LOG.warn("checkWorkers: Only " + healthyWorkerList.size() +
-                     " available when " + minWorkers + " are required.");
+            LOG.error("checkWorkers: Only " + healthyWorkerList.size() +
+                      " available when " + minWorkers + " are required.");
             return null;
         }
 
@@ -450,6 +453,7 @@ public class BspServiceMaster<
         return workerHostnamePortMap;
     }
 
+    @Override
     public int createInputSplits() {
         // Only the 'master' should be doing this.  Wait until the number of
         // processes that have reported health exceeds the minimum percentage.
@@ -481,6 +485,7 @@ public class BspServiceMaster<
         Map<String, JSONArray> healthyWorkerHostnamePortMap = checkWorkers();
         if (healthyWorkerHostnamePortMap == null) {
             setJobState(ApplicationState.FAILED, -1, -1);
+            return -1;
         }
 
         List<InputSplit> splitList =
@@ -726,6 +731,7 @@ public class BspServiceMaster<
         }
     }
 
+    @Override
     public void setup() {
         // Might have to manually load a checkpoint.
         // In that case, the input splits are not set, they will be faked by
@@ -750,6 +756,7 @@ public class BspServiceMaster<
         }
     }
 
+    @Override
     public boolean becomeMaster() {
         // Create my bid to become the master, then try to become the worker
         // or return false.

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1166854&r1=1166853&r2=1166854&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java 
(original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java 
Thu Sep  8 18:47:19 2011
@@ -89,61 +89,61 @@ public class MasterThread<I extends Writ
             long endMillis = 0;
             bspServiceMaster.setup();
             if (bspServiceMaster.becomeMaster() == true) {
-                if (bspServiceMaster.getRestartedSuperstep() ==
-                        BspService.UNSET_SUPERSTEP) {
-                    bspServiceMaster.createInputSplits();
-                }
-                long setupMillis = (System.currentTimeMillis() - startMillis);
-                context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
-                                   "Setup (milliseconds)").
-                                   increment(setupMillis);
-                setupSecs = setupMillis / 1000.0d;
-                SuperstepState superstepState = SuperstepState.INITIAL;
-                long cachedSuperstep = BspService.UNSET_SUPERSTEP;
-                while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
-                    long startSuperstepMillis = System.currentTimeMillis();
-                    cachedSuperstep = bspServiceMaster.getSuperstep();
-                    superstepState = bspServiceMaster.coordinateSuperstep();
-                    long superstepMillis = System.currentTimeMillis() -
-                        startSuperstepMillis;
-                    superstepSecsMap.put(new Long(cachedSuperstep),
-                                           superstepMillis / 1000.0d);
-                    if (LOG.isInfoEnabled()) {
-                        LOG.info("masterThread: Coordination of superstep " +
-                                 cachedSuperstep + " took " +
-                                 superstepMillis / 1000.0d +
-                                 " seconds ended with state " + superstepState 
+
-                                 " and is now on superstep " +
-                                 bspServiceMaster.getSuperstep());
-                    }
-                    if (superstepCounterOn) {
-                        String counterPrefix;
-                        if (cachedSuperstep == -1) {
-                            counterPrefix = "Vertex input superstep";
-                        } else {
-                            counterPrefix = "Superstep " + cachedSuperstep;
+                // Attempt to create InputSplits if necessary. Bail out if 
that fails.
+                if (bspServiceMaster.getRestartedSuperstep() != 
BspService.UNSET_SUPERSTEP
+                        || bspServiceMaster.createInputSplits() != -1) {
+                    long setupMillis = (System.currentTimeMillis() - 
startMillis);
+                    context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+                            "Setup (milliseconds)").
+                            increment(setupMillis);
+                    setupSecs = setupMillis / 1000.0d;
+                    SuperstepState superstepState = SuperstepState.INITIAL;
+                    long cachedSuperstep = BspService.UNSET_SUPERSTEP;
+                    while (superstepState != 
SuperstepState.ALL_SUPERSTEPS_DONE) {
+                        long startSuperstepMillis = System.currentTimeMillis();
+                        cachedSuperstep = bspServiceMaster.getSuperstep();
+                        superstepState = 
bspServiceMaster.coordinateSuperstep();
+                        long superstepMillis = System.currentTimeMillis() -
+                                startSuperstepMillis;
+                        superstepSecsMap.put(new Long(cachedSuperstep),
+                                superstepMillis / 1000.0d);
+                        if (LOG.isInfoEnabled()) {
+                            LOG.info("masterThread: Coordination of superstep 
" +
+                                    cachedSuperstep + " took " +
+                                    superstepMillis / 1000.0d +
+                                    " seconds ended with state " + 
superstepState +
+                                    " and is now on superstep " +
+                                    bspServiceMaster.getSuperstep());
+                        }
+                        if (superstepCounterOn) {
+                            String counterPrefix;
+                            if (cachedSuperstep == -1) {
+                                counterPrefix = "Vertex input superstep";
+                            } else {
+                                counterPrefix = "Superstep " + cachedSuperstep;
+                            }
+                            
context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+                                    counterPrefix +
+                                    " (milliseconds)").
+                                    increment(superstepMillis);
                         }
-                        context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
-                                           counterPrefix +
-                                           " (milliseconds)").
-                                           increment(superstepMillis);
-                    }
 
-                    // If a worker failed, restart from a known good superstep
-                    if (superstepState == SuperstepState.WORKER_FAILURE) {
-                        bspServiceMaster.restartFromCheckpoint(
-                            bspServiceMaster.getLastGoodCheckpoint());
+                        // If a worker failed, restart from a known good 
superstep
+                        if (superstepState == SuperstepState.WORKER_FAILURE) {
+                            bspServiceMaster.restartFromCheckpoint(
+                                    bspServiceMaster.getLastGoodCheckpoint());
+                        }
+                        endMillis = System.currentTimeMillis();
                     }
-                    endMillis = System.currentTimeMillis();
+                    bspServiceMaster.setJobState(ApplicationState.FINISHED, 
-1, -1);
                 }
-                bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, 
-1);
             }
             bspServiceMaster.cleanup();
             if (!superstepSecsMap.isEmpty()) {
                 context.getCounter(
-                    GIRAPH_TIMERS_COUNTER_GROUP_NAME,
-                    "Shutdown (milliseconds)").
-                    increment(System.currentTimeMillis() - endMillis);
+                        GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+                        "Shutdown (milliseconds)").
+                        increment(System.currentTimeMillis() - endMillis);
                 if (LOG.isInfoEnabled()) {
                     LOG.info("setup: Took " + setupSecs + " seconds.");
                 }

Added: 
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java?rev=1166854&view=auto
==============================================================================
--- 
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
 (added)
+++ 
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestNotEnoughMapTasks.java
 Thu Sep  8 18:47:19 2011
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import java.io.IOException;
+
+import org.apache.giraph.examples.SimpleCheckpointVertex;
+import 
org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import 
org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.fs.Path;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for not enough map tasks
+ */
+public class TestNotEnoughMapTasks extends BspCase {
+    /**
+     * Create the test case
+     *
+     * @param testName name of the test case
+     */
+    public TestNotEnoughMapTasks(String testName) {
+        super(testName);
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite() {
+        return new TestSuite(TestNotEnoughMapTasks.class);
+    }
+
+    /**
+     * This job should always fail gracefully with not enough map tasks.
+     *
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    public void testNotEnoughMapTasks()
+            throws IOException, InterruptedException, ClassNotFoundException {
+        if (getJobTracker() == null) {
+            System.out.println(
+                "testNotEnoughMapTasks: Ignore this test in local mode.");
+            return;
+        }
+        GiraphJob job = new GiraphJob(getCallingMethodName());
+        setupConfiguration(job);
+        // An unlikely impossible number of workers to achieve
+        final int unlikelyWorkers = Short.MAX_VALUE;
+        job.setWorkerConfiguration(
+            unlikelyWorkers, unlikelyWorkers, 100.0f);
+        // Only one poll attempt of one second to make failure faster
+        job.getConfiguration().setInt(GiraphJob.POLL_ATTEMPTS, 1);
+        job.getConfiguration().setInt(GiraphJob.POLL_MSECS, 1);
+        job.setVertexClass(SimpleCheckpointVertex.class);
+        job.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
+        
job.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
+        Path outputPath = new Path("/tmp/" + getCallingMethodName());
+        removeAndSetOutput(job, outputPath);
+        assertFalse(job.run(false));
+    }
+}


Reply via email to