GIRAPH-944: Improve job tracking on command line
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4485e563 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4485e563 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4485e563 Branch: refs/heads/release-1.1 Commit: 4485e563a6582afb1c848ea80888fdad50ada516 Parents: de0efb0 Author: Maja Kabiljo <[email protected]> Authored: Tue Aug 26 11:35:14 2014 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue Aug 26 14:42:52 2014 -0700 ---------------------------------------------------------------------- giraph-core/pom.xml | 24 +- .../java/org/apache/giraph/bsp/BspService.java | 12 +- .../apache/giraph/bsp/CentralizedService.java | 9 + .../org/apache/giraph/conf/GiraphConstants.java | 2 +- .../apache/giraph/graph/GraphTaskManager.java | 30 +++ .../giraph/graph/JobProgressTrackerClient.java | 33 +++ .../graph/JobProgressTrackerClientNoOp.java | 47 ++++ .../RetryableJobProgressTrackerClient.java | 175 ++++++++++++++ .../java/org/apache/giraph/job/GiraphJob.java | 16 +- .../apache/giraph/job/JobProgressTracker.java | 155 +++---------- .../giraph/job/JobProgressTrackerService.java | 193 +++++++++++++++ .../apache/giraph/master/BspServiceMaster.java | 10 +- .../org/apache/giraph/master/MasterCompute.java | 30 ++- .../apache/giraph/scripting/ScriptLoader.java | 6 +- .../java/org/apache/giraph/utils/FileUtils.java | 2 +- .../apache/giraph/worker/BspServiceWorker.java | 3 +- .../org/apache/giraph/worker/WorkerContext.java | 10 + .../apache/giraph/worker/WorkerProgress.java | 232 ++++++++++--------- .../giraph/worker/WorkerProgressWriter.java | 30 +-- .../org/apache/giraph/zk/ZooKeeperManager.java | 4 +- .../test/java/org/apache/giraph/BspCase.java | 4 +- giraph-examples/pom.xml | 8 - .../java/org/apache/giraph/TestBspBasic.java | 4 +- .../giraph/hive/jython/HiveJythonUtils.java | 2 +- pom.xml | 81 ++++++- 25 files changed, 825 insertions(+), 297 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml index b66ba1d..23f6666 100644 --- a/giraph-core/pom.xml +++ b/giraph-core/pom.xml @@ -289,14 +289,6 @@ under the License. </plugin> </plugins> </build> - <dependencies> - <dependency> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - <version>${dep.oldnetty.version}</version> - <scope>test</scope> - </dependency> - </dependencies> </profile> <profile> @@ -467,6 +459,22 @@ under the License. <dependencies> <!-- compile dependencies. sorted lexicographically. --> <dependency> + <groupId>com.facebook.nifty</groupId> + <artifactId>nifty-client</artifactId> + </dependency> + <dependency> + <groupId>com.facebook.swift</groupId> + <artifactId>swift-annotations</artifactId> + </dependency> + <dependency> + <groupId>com.facebook.swift</groupId> + <artifactId>swift-codec</artifactId> + </dependency> + <dependency> + <groupId>com.facebook.swift</groupId> + <artifactId>swift-service</artifactId> + </dependency> + <dependency> <groupId>com.facebook.thirdparty.yourkit-api</groupId> <artifactId>yjp-controller-api-redist</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index c418a89..2a50489 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -23,6 +23,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphTaskManager; import org.apache.giraph.graph.InputSplitEvents; import org.apache.giraph.graph.InputSplitPaths; +import org.apache.giraph.job.JobProgressTracker; import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.utils.CheckpointingUtils; import org.apache.giraph.worker.WorkerInfo; @@ -161,8 +162,6 @@ public abstract class BspService<I extends WritableComparable, "/_partitionExchangeDir"; /** Denotes that the superstep is done */ public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished"; - /** Stores progress info for workers */ - public static final String WORKER_PROGRESSES = "/_workerProgresses"; /** Denotes that computation should be halted */ public static final String HALT_COMPUTATION_NODE = "/_haltComputation"; /** User sets this flag to checkpoint and stop the job */ @@ -241,8 +240,6 @@ public abstract class BspService<I extends WritableComparable, protected final String savedCheckpointBasePath; /** Path to the master election path */ protected final String masterElectionPath; - /** Stores progress info of this worker */ - protected final String myProgressPath; /** If this path exists computation will be halted */ protected final String haltComputationPath; /** Private ZooKeeper instance that implements the service */ @@ -363,7 +360,6 @@ public abstract class BspService<I extends WritableComparable, getCheckpointBasePath(getConfiguration(), getJobId()); masterElectionPath = basePath + MASTER_ELECTION_DIR; - myProgressPath = basePath + WORKER_PROGRESSES + "/" + taskPartition; String serverPortList = conf.getZookeeperList(); haltComputationPath = basePath + HALT_COMPUTATION_NODE; getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP, @@ -404,7 +400,6 @@ public abstract class BspService<I extends WritableComparable, "BspService: Invalid superstep to restart - " + restartedSuperstep); } - } /** @@ -1258,6 +1253,11 @@ public abstract class BspService<I extends WritableComparable, return lastCheckpointedSuperstep; } + @Override + public JobProgressTracker getJobProgressTracker() { + return getGraphTaskManager().getJobProgressTracker(); + } + /** * Only get the finalized checkpoint files */ http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java index 560f1fb..0cadfb7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedService.java @@ -19,6 +19,8 @@ package org.apache.giraph.bsp; import java.util.List; + +import org.apache.giraph.job.JobProgressTracker; import org.apache.giraph.worker.WorkerInfo; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -56,4 +58,11 @@ public interface CentralizedService<I extends WritableComparable, * @return List of workers */ List<WorkerInfo> getWorkerInfoList(); + + /** + * Get JobProgressTracker to report progress to + * + * @return JobProgressTrackerClient + */ + JobProgressTracker getJobProgressTracker(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index da0a8db..d1fdf57 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -1119,7 +1119,7 @@ public interface GiraphConstants { /** Whether to track job progress on client or not */ BooleanConfOption TRACK_JOB_PROGRESS_ON_CLIENT = - new BooleanConfOption("giraph.trackJobProgressOnClient", true, + new BooleanConfOption("giraph.trackJobProgressOnClient", false, "Whether to track job progress on client or not"); /** Number of retries for creating the HDFS files */ http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 8a97939..ba5d2fa 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -25,6 +25,7 @@ import org.apache.giraph.bsp.CheckpointStatus; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.job.JobProgressTracker; import org.apache.giraph.scripting.ScriptLoader; import org.apache.giraph.master.BspServiceMaster; import org.apache.giraph.master.MasterAggregatorUsage; @@ -69,6 +70,7 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -122,6 +124,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, /** Superstep stats */ private FinishedSuperstepStats finishedSuperstepStats = new FinishedSuperstepStats(0, false, 0, 0, false, CheckpointStatus.NONE); + /** Job progress tracker */ + private JobProgressTrackerClient jobProgressTracker; // Per-Job Metrics /** Timer for WorkerContext#preApplication() */ @@ -194,6 +198,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, context.setStatus("setup: Beginning worker setup."); Configuration hadoopConf = context.getConfiguration(); conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf); + initializeJobProgressTracker(); // Write user's graph types (I,V,E,M) back to configuration parameters so // that they are set for quicker access later. These types are often // inferred from the Computation class used. @@ -245,6 +250,26 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } /** + * Create and connect a client to JobProgressTrackerService, + * or no-op implementation if progress shouldn't be tracked or something + * goes wrong + */ + private void initializeJobProgressTracker() { + if (!conf.trackJobProgressOnClient()) { + jobProgressTracker = new JobProgressTrackerClientNoOp(); + } else { + try { + jobProgressTracker = new RetryableJobProgressTrackerClient(conf); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("createJobProgressClient: Exception occurred while trying to" + + " connect to JobProgressTracker - not reporting progress", e); + jobProgressTracker = new JobProgressTrackerClientNoOp(); + } + } + jobProgressTracker.mapperStarted(); + } + + /** * Perform the work assigned to this compute node for this job run. * 1) Run checkpoint per frequency policy. * 2) For every vertex on this mapper, run the compute() function @@ -485,6 +510,10 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, return serviceWorker.getWorkerContext(); } + public JobProgressTracker getJobProgressTracker() { + return jobProgressTracker; + } + /** * Copied from JobConf to get the location of this jar. Workaround for * things like Oozie map-reduce jobs. NOTE: Pure YARN profile cannot @@ -878,6 +907,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, if (LOG.isInfoEnabled()) { LOG.info("cleanup: Starting for " + getGraphFunctions()); } + jobProgressTracker.cleanup(); if (done) { return; } http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java new file mode 100644 index 0000000..c302d9a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClient.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import org.apache.giraph.job.JobProgressTracker; + +import java.io.IOException; + +/** + * Wrapper around JobProgressTracker which retries to connect and swallows + * exceptions so app wouldn't crash if something goes wrong with progress + * reports. + */ +public interface JobProgressTrackerClient extends JobProgressTracker { + /** Close the connections if any */ + void cleanup() throws IOException; +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java new file mode 100644 index 0000000..d75fd42 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/JobProgressTrackerClientNoOp.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import org.apache.giraph.worker.WorkerProgress; + +/** + * Class to use for JobProgressTracker client when progress shouldn't be + * tracked or something goes wrong + */ +public class JobProgressTrackerClientNoOp implements JobProgressTrackerClient { + @Override + public void cleanup() { + } + + @Override + public void mapperStarted() { + } + + @Override + public void logInfo(String logLine) { + } + + @Override + public void logFailure(String reason) { + } + + @Override + public void updateProgress(WorkerProgress workerProgress) { + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java new file mode 100644 index 0000000..f15a2e7 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/graph/RetryableJobProgressTrackerClient.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.job.JobProgressTracker; +import org.apache.giraph.worker.WorkerProgress; +import org.apache.log4j.Logger; + +import com.facebook.nifty.client.FramedClientConnector; +import com.facebook.swift.service.RuntimeTTransportException; +import com.facebook.swift.service.ThriftClientManager; +import com.google.common.io.Closeables; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutionException; + +/** + * Wrapper around JobProgressTracker which retires to connect and swallows + * exceptions so app wouldn't crash if something goes wrong with progress + * reports. + */ +public class RetryableJobProgressTrackerClient + implements JobProgressTrackerClient { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(RetryableJobProgressTrackerClient.class); + /** Configuration */ + private final GiraphConfiguration conf; + /** Thrift client manager to use to connect to job progress tracker */ + private ThriftClientManager clientManager; + /** Job progress tracker */ + private JobProgressTracker jobProgressTracker; + + /** + * Constructor + * + * @param conf Giraph configuration + */ + public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws + ExecutionException, InterruptedException { + this.conf = conf; + resetConnection(); + } + + /** + * Try to establish new connection to JobProgressTracker + */ + private void resetConnection() throws ExecutionException, + InterruptedException { + clientManager = new ThriftClientManager(); + FramedClientConnector connector = + new FramedClientConnector(new InetSocketAddress( + JOB_PROGRESS_SERVICE_HOST.get(conf), + JOB_PROGRESS_SERVICE_PORT.get(conf))); + jobProgressTracker = + clientManager.createClient(connector, JobProgressTracker.class).get(); + + } + + @Override + public synchronized void cleanup() throws IOException { + Closeables.close(clientManager, true); + try { + clientManager.close(); + // CHECKSTYLE: stop IllegalCatch + } catch (Exception e) { + // CHECKSTYLE: resume IllegalCatch + if (LOG.isDebugEnabled()) { + LOG.debug( + "Exception occurred while trying to close JobProgressTracker", e); + } + } + } + + @Override + public synchronized void mapperStarted() { + executeWithRetry(new Runnable() { + @Override + public void run() { + jobProgressTracker.mapperStarted(); + } + }); + } + + @Override + public synchronized void logInfo(final String logLine) { + executeWithRetry(new Runnable() { + @Override + public void run() { + jobProgressTracker.logInfo(logLine); + } + }); + } + + @Override + public synchronized void logFailure(final String reason) { + executeWithRetry(new Runnable() { + @Override + public void run() { + jobProgressTracker.logFailure(reason); + } + }); + } + + @Override + public synchronized void updateProgress(final WorkerProgress workerProgress) { + executeWithRetry(new Runnable() { + @Override + public void run() { + jobProgressTracker.updateProgress(workerProgress); + } + }); + } + + /** + * Execute Runnable, if disconnected try to connect again and retry + * + * @param runnable Runnable to execute + */ + private void executeWithRetry(Runnable runnable) { + try { + runnable.run(); + } catch (RuntimeTTransportException te) { + if (LOG.isDebugEnabled()) { + LOG.debug("RuntimeTTransportException occurred while talking to " + + "JobProgressTracker server, trying to reconnect", te); + } + try { + try { + clientManager.close(); + // CHECKSTYLE: stop IllegalCatch + } catch (Exception e) { + // CHECKSTYLE: resume IllegalCatch + if (LOG.isDebugEnabled()) { + LOG.debug(""); + } + } + resetConnection(); + runnable.run(); + // CHECKSTYLE: stop IllegalCatch + } catch (Exception e) { + // CHECKSTYLE: resume IllegalCatch + if (LOG.isInfoEnabled()) { + LOG.info("Exception occurred while talking to " + + "JobProgressTracker server, giving up", e); + } + } + // CHECKSTYLE: stop IllegalCatch + } catch (Exception e) { + // CHECKSTYLE: resume IllegalCatch + if (LOG.isInfoEnabled()) { + LOG.info("Exception occurred while talking to " + + "JobProgressTracker server, giving up", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java index 436126b..93aa679 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphJob.java @@ -239,6 +239,9 @@ public class GiraphJob { int tryCount = 0; GiraphJobRetryChecker retryChecker = conf.getJobRetryChecker(); while (true) { + JobProgressTrackerService jobProgressTrackerService = + JobProgressTrackerService.createJobProgressServer(conf); + tryCount++; Job submittedJob = new Job(conf, jobName); if (submittedJob.getJar() == null) { @@ -253,16 +256,17 @@ public class GiraphJob { jobObserver.launchingJob(submittedJob); submittedJob.submit(); if (LOG.isInfoEnabled()) { - LOG.info("run: Tracking URL: " + submittedJob.getTrackingURL()); + LOG.info("Tracking URL: " + submittedJob.getTrackingURL()); + LOG.info( + "Waiting for resources... Job will start only when it gets all " + + (conf.getMinWorkers() + 1) + " mappers"); } - HaltApplicationUtils.printHaltInfo(submittedJob, conf); - JobProgressTracker jobProgressTracker = conf.trackJobProgressOnClient() ? - new JobProgressTracker(submittedJob, conf) : null; jobObserver.jobRunning(submittedJob); + HaltApplicationUtils.printHaltInfo(submittedJob, conf); boolean passed = submittedJob.waitForCompletion(verbose); - if (jobProgressTracker != null) { - jobProgressTracker.stop(); + if (jobProgressTrackerService != null) { + jobProgressTrackerService.stop(passed); } jobObserver.jobFinished(submittedJob, passed); http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java index 6971174..95bc56d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTracker.java @@ -18,138 +18,53 @@ package org.apache.giraph.job; -import org.apache.giraph.bsp.BspService; -import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.utils.CounterUtils; -import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.conf.StrConfOption; import org.apache.giraph.worker.WorkerProgress; -import org.apache.giraph.zk.ZooKeeperExt; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.Progressable; -import org.apache.log4j.Logger; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooDefs; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import com.facebook.swift.service.ThriftMethod; +import com.facebook.swift.service.ThriftService; /** - * Class which tracks job's progress on client + * Interface for job progress tracker on job client */ -public class JobProgressTracker implements Watcher { - /** Class logger */ - private static final Logger LOG = Logger.getLogger(JobProgressTracker.class); - /** How often to print job's progress */ - private static final int UPDATE_MILLISECONDS = 5 * 1000; - /** Thread which periodically writes job's progress */ - private Thread writerThread; - /** ZooKeeperExt */ - private ZooKeeperExt zk; - /** Whether application is finished */ - private volatile boolean finished = false; +@ThriftService +public interface JobProgressTracker { + /** Host on which job progress service runs */ + StrConfOption JOB_PROGRESS_SERVICE_HOST = + new StrConfOption("giraph.jobProgressServiceHost", null, + "Host on which job progress service runs"); + /** Port which job progress service uses */ + IntConfOption JOB_PROGRESS_SERVICE_PORT = + new IntConfOption("giraph.jobProgressServicePort", -1, + "Port which job progress service uses"); + + /** Notify JobProgressTracker that mapper started */ + @ThriftMethod + void mapperStarted(); /** - * Constructor + * Call this when you want to log an info line from any mapper to command line * - * @param submittedJob Job to track - * @param conf Configuration + * @param logLine Line to log */ - public JobProgressTracker(final Job submittedJob, - final GiraphConfiguration conf) throws IOException, InterruptedException { - String zkServer = CounterUtils.waitAndGetCounterNameFromGroup( - submittedJob, GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP); - final String basePath = CounterUtils.waitAndGetCounterNameFromGroup( - submittedJob, GiraphConstants.ZOOKEEPER_BASE_PATH_COUNTER_GROUP); - // Connect to ZooKeeper - if (zkServer != null && basePath != null) { - zk = new ZooKeeperExt( - zkServer, - conf.getZooKeeperSessionTimeout(), - conf.getZookeeperOpsMaxAttempts(), - conf.getZookeeperOpsRetryWaitMsecs(), - this, - new Progressable() { - @Override - public void progress() { - } - }); - writerThread = new Thread(new Runnable() { - @Override - public void run() { - String workerProgressBasePath = basePath + - BspService.WORKER_PROGRESSES; - try { - while (!finished) { - if (zk.exists(workerProgressBasePath, false) != null) { - // Get locations of all worker progresses - List<String> workerProgressPaths = zk.getChildrenExt( - workerProgressBasePath, false, false, true); - List<WorkerProgress> workerProgresses = - new ArrayList<WorkerProgress>(workerProgressPaths.size()); - // Read all worker progresses - for (String workerProgressPath : workerProgressPaths) { - WorkerProgress workerProgress = new WorkerProgress(); - byte[] zkData = zk.getData(workerProgressPath, false, null); - WritableUtils.readFieldsFromByteArray(zkData, workerProgress); - workerProgresses.add(workerProgress); - } - // Combine and log - CombinedWorkerProgress combinedWorkerProgress = - new CombinedWorkerProgress(workerProgresses); - if (LOG.isInfoEnabled()) { - LOG.info(combinedWorkerProgress.toString()); - } - // Check if application is done - if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) { - break; - } - } - Thread.sleep(UPDATE_MILLISECONDS); - } - // CHECKSTYLE: stop IllegalCatchCheck - } catch (Exception e) { - // CHECKSTYLE: resume IllegalCatchCheck - if (LOG.isInfoEnabled()) { - LOG.info("run: Exception occurred", e); - } - } finally { - try { - // Create a node so master knows we stopped communicating with - // ZooKeeper and it's safe to cleanup - zk.createExt( - basePath + BspService.CLEANED_UP_DIR + "/client", - null, - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true); - zk.close(); - // CHECKSTYLE: stop IllegalCatchCheck - } catch (Exception e) { - // CHECKSTYLE: resume IllegalCatchCheck - if (LOG.isInfoEnabled()) { - LOG.info("run: Exception occurred", e); - } - } - } - } - }); - writerThread.start(); - } - } + @ThriftMethod + void logInfo(String logLine); /** - * Stop the thread which logs application progress + * Notify that job is failing + * + * @param reason Reason for failure */ - public void stop() { - finished = true; - } + @ThriftMethod + void logFailure(String reason); - @Override - public void process(WatchedEvent event) { - } + /** + * Workers should call this method to update their progress + * + * @param workerProgress Progress of the worker + */ + @ThriftMethod + void updateProgress(WorkerProgress workerProgress); } http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java new file mode 100644 index 0000000..3a896e2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/job/JobProgressTrackerService.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.job; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.worker.WorkerProgress; +import org.apache.log4j.Logger; + +import com.facebook.swift.codec.ThriftCodecManager; +import com.facebook.swift.service.ThriftEventHandler; +import com.facebook.swift.service.ThriftServer; +import com.facebook.swift.service.ThriftServerConfig; +import com.facebook.swift.service.ThriftServiceProcessor; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implementation of job progress tracker service on job client + */ +public class JobProgressTrackerService implements JobProgressTracker { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(JobProgressTrackerService.class); + /** How often to print job's progress */ + private static final int UPDATE_MILLISECONDS = 10 * 1000; + + /** Configuration */ + private final GiraphConfiguration conf; + /** Thread which periodically writes job's progress */ + private Thread writerThread; + /** Whether application is finished */ + private volatile boolean finished = false; + /** Server which uses this service */ + private ThriftServer server; + /** Number of mappers which the job got */ + private int mappersStarted; + /** Last time number of mappers started was logged */ + private long lastTimeMappersStartedLogged; + /** Map of worker progresses */ + private final Map<Integer, WorkerProgress> workerProgresses = + new ConcurrentHashMap<>(); + + /** + * Constructor + * + * @param conf Configuration + */ + public JobProgressTrackerService(GiraphConfiguration conf) { + this.conf = conf; + if (LOG.isInfoEnabled()) { + LOG.info("Waiting for job to start... (this may take a minute)"); + } + startWriterThread(); + } + + /** + * Start the thread which writes progress periodically + */ + private void startWriterThread() { + writerThread = new Thread(new Runnable() { + @Override + public void run() { + while (!finished) { + if (mappersStarted == conf.getMaxWorkers() + 1 && + !workerProgresses.isEmpty()) { + // Combine and log + CombinedWorkerProgress combinedWorkerProgress = + new CombinedWorkerProgress(workerProgresses.values()); + if (LOG.isInfoEnabled()) { + LOG.info(combinedWorkerProgress.toString()); + } + // Check if application is done + if (combinedWorkerProgress.isDone(conf.getMaxWorkers())) { + break; + } + } + try { + Thread.sleep(UPDATE_MILLISECONDS); + } catch (InterruptedException e) { + if (LOG.isInfoEnabled()) { + LOG.info("Progress thread interrupted"); + } + break; + } + } + } + }); + writerThread.start(); + } + + @Override + public synchronized void mapperStarted() { + mappersStarted++; + if (LOG.isInfoEnabled()) { + if (mappersStarted == conf.getMaxWorkers() + 1) { + LOG.info("Got all " + mappersStarted + " mappers"); + } else { + if (System.currentTimeMillis() - lastTimeMappersStartedLogged > + UPDATE_MILLISECONDS) { + lastTimeMappersStartedLogged = System.currentTimeMillis(); + LOG.info("Got " + mappersStarted + " but needs " + + (conf.getMaxWorkers() + 1) + " mappers"); + } + } + } + } + + @Override + public void logInfo(String logLine) { + if (LOG.isInfoEnabled()) { + LOG.info(logLine); + } + } + + @Override + public void logFailure(String reason) { + LOG.fatal(reason); + finished = true; + writerThread.interrupt(); + } + + @Override + public void updateProgress(WorkerProgress workerProgress) { + workerProgresses.put(workerProgress.getTaskId(), workerProgress); + } + + /** + * Stop the thread which logs application progress and server + * + * @param succeeded Whether job succeeded or not + */ + public void stop(boolean succeeded) { + finished = true; + writerThread.interrupt(); + server.close(); + if (LOG.isInfoEnabled()) { + LOG.info("Job " + (succeeded ? "finished successfully" : "failed") + + ", cleaning up..."); + } + } + + /** + * Create job progress server on job client, and update configuration with + * its hostname and port so mappers would know what to connect to. Returns + * null if progress shouldn't be tracked + * + * @param conf Configuration + * @return JobProgressTrackerService + */ + public static JobProgressTrackerService createJobProgressServer( + GiraphConfiguration conf) { + if (!conf.trackJobProgressOnClient()) { + return null; + } + try { + JobProgressTrackerService service = new JobProgressTrackerService(conf); + ThriftServiceProcessor processor = + new ThriftServiceProcessor(new ThriftCodecManager(), + new ArrayList<ThriftEventHandler>(), service); + service.server = new ThriftServer(processor, new ThriftServerConfig()); + service.server.start(); + JOB_PROGRESS_SERVICE_HOST.set(conf, + InetAddress.getLocalHost().getHostName()); + JOB_PROGRESS_SERVICE_PORT.set(conf, service.server.getPort()); + return service; + // CHECKSTYLE: stop IllegalCatch + } catch (Exception e) { + // CHECKSTYLE: resume IllegalCatch + LOG.warn("Exception occurred while trying to create " + + "JobProgressTrackerService - not using progress reporting", e); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 671df23..efa5b87 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -307,6 +307,7 @@ public class BspServiceMaster<I extends WritableComparable, * @param reason The reason the job failed */ private void setJobStateFailed(String reason) { + getGraphTaskManager().getJobProgressTracker().logFailure(reason); setJobState(ApplicationState.FAILED, -1, -1, false); failJob(new IllegalStateException(reason)); } @@ -644,7 +645,8 @@ public class BspServiceMaster<I extends WritableComparable, "check input of " + inputFormat.getClass().getName() + "!"); getContext().setStatus("Failing job due to 0 input splits, " + "check input of " + inputFormat.getClass().getName() + "!"); - setJobStateFailed("0 input splits"); + setJobStateFailed("Please check your input tables - partitions which " + + "you specified are missing. Failing the job!!!"); } if (minSplitCountHint > splitList.size()) { LOG.warn(logPrefix + ": Number of inputSplits=" + @@ -885,7 +887,7 @@ public class BspServiceMaster<I extends WritableComparable, getContext()); aggregatorHandler.initialize(this); masterCompute = getConfiguration().createMasterCompute(); - masterCompute.setMasterAggregatorUsage(aggregatorHandler); + masterCompute.setMasterService(this); masterInfo = new MasterInfo(); masterServer = @@ -1790,10 +1792,6 @@ public class BspServiceMaster<I extends WritableComparable, GraphFunctions.ALL_EXCEPT_ZOOKEEPER)) { maxTasks *= 2; } - if (getConfiguration().trackJobProgressOnClient()) { - // For job client - maxTasks++; - } List<String> cleanedUpChildrenList = null; while (true) { try { http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java index c2a1f9a..552cca9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java @@ -19,6 +19,7 @@ package org.apache.giraph.master; import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.graph.Computation; @@ -46,7 +47,7 @@ public abstract class MasterCompute /** If true, do not do anymore computation on this vertex. */ private boolean halt = false; /** Master aggregator usage */ - private MasterAggregatorUsage masterAggregatorUsage; + private CentralizedServiceMaster serviceMaster; /** Graph state */ private GraphState graphState; /** @@ -192,14 +193,16 @@ public abstract class MasterCompute public final <A extends Writable> boolean registerAggregator( String name, Class<? extends Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException { - return masterAggregatorUsage.registerAggregator(name, aggregatorClass); + return serviceMaster.getAggregatorHandler().registerAggregator( + name, aggregatorClass); } @Override public final <A extends Writable> boolean registerAggregator( String name, WritableFactory<? extends Aggregator<A>> aggregator) throws InstantiationException, IllegalAccessException { - return masterAggregatorUsage.registerAggregator(name, aggregator); + return serviceMaster.getAggregatorHandler().registerAggregator( + name, aggregator); } @Override @@ -207,28 +210,37 @@ public abstract class MasterCompute String name, Class<? extends Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException { - return masterAggregatorUsage.registerPersistentAggregator( + return serviceMaster.getAggregatorHandler().registerPersistentAggregator( name, aggregatorClass); } @Override public final <A extends Writable> A getAggregatedValue(String name) { - return masterAggregatorUsage.<A>getAggregatedValue(name); + return serviceMaster.getAggregatorHandler().<A>getAggregatedValue(name); } @Override public final <A extends Writable> void setAggregatedValue( String name, A value) { - masterAggregatorUsage.setAggregatedValue(name, value); + serviceMaster.getAggregatorHandler().setAggregatedValue(name, value); + } + + /** + * Call this to log a line to command line of the job. Use in moderation - + * it's a synchronous call to Job client + * + * @param line Line to print + */ + public void logToCommandLine(String line) { + serviceMaster.getJobProgressTracker().logInfo(line); } final void setGraphState(GraphState graphState) { this.graphState = graphState; } - final void setMasterAggregatorUsage(MasterAggregatorUsage - masterAggregatorUsage) { - this.masterAggregatorUsage = masterAggregatorUsage; + final void setMasterService(CentralizedServiceMaster serviceMaster) { + this.serviceMaster = serviceMaster; } final void setSuperstepClasses(SuperstepClasses superstepClasses) { http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java b/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java index 2b30739..f78b1a0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java +++ b/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java @@ -150,7 +150,7 @@ public class ScriptLoader { * * @param conf Configuration */ - public static void loadScripts(Configuration conf) { + public static void loadScripts(Configuration conf) throws IOException { List<DeployedScript> deployedScripts = getScriptsToLoad(conf); if (deployedScripts == null) { return; @@ -167,7 +167,7 @@ public class ScriptLoader { * @param deployedScript the deployed script */ public static void loadScript(Configuration conf, - DeployedScript deployedScript) { + DeployedScript deployedScript) throws IOException { InputStream stream = openScriptInputStream(conf, deployedScript); switch (deployedScript.getLanguage()) { case JYTHON: @@ -180,7 +180,7 @@ public class ScriptLoader { } LOADED_SCRIPTS.add(deployedScript); - Closeables.closeQuietly(stream); + Closeables.close(stream, true); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java index 6e8b1e3..0f9a08a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java @@ -129,7 +129,7 @@ public class FileUtils { writer.write('\n'); } } finally { - Closeables.closeQuietly(writer); + Closeables.close(writer, true); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 447bb6f..120678f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -229,7 +229,8 @@ public class BspServiceWorker<I extends WritableComparable, WorkerProgress.get().setTaskId(getTaskPartition()); workerProgressWriter = conf.trackJobProgressOnClient() ? - new WorkerProgressWriter(myProgressPath, getZkExt()) : null; + new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) : + null; GiraphMetrics.get().addSuperstepResetObserver(this); } http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java index aca9944..7a55d56 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java @@ -206,6 +206,16 @@ public abstract class WorkerContext return workerAggregatorUsage.<A>getAggregatedValue(name); } + /** + * Call this to log a line to command line of the job. Use in moderation - + * it's a synchronous call to Job client + * + * @param line Line to print + */ + public void logToCommandLine(String line) { + serviceWorker.getJobProgressTracker().logInfo(line); + } + @Override public void write(DataOutput dataOutput) throws IOException { } http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java index 1a2a6ee..24f791b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgress.java @@ -19,17 +19,9 @@ package org.apache.giraph.worker; import org.apache.giraph.utils.MemoryUtils; -import org.apache.giraph.utils.WritableUtils; -import org.apache.giraph.zk.ZooKeeperExt; -import org.apache.hadoop.io.Writable; -import org.apache.log4j.Logger; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooDefs; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.codec.ThriftStruct; import javax.annotation.concurrent.ThreadSafe; @@ -38,9 +30,8 @@ import javax.annotation.concurrent.ThreadSafe; * ZooKeeper with {@link WorkerProgressWriter}. */ @ThreadSafe -public class WorkerProgress implements Writable { - /** Class logger */ - private static final Logger LOG = Logger.getLogger(WorkerProgress.class); +@ThriftStruct +public class WorkerProgress { /** Singleton instance for everyone to use */ private static final WorkerProgress INSTANCE = new WorkerProgress(); @@ -99,45 +90,6 @@ public class WorkerProgress implements Writable { } /** - * Write worker's progress to znode - * - * @param zk ZooKeeperExt - * @param myProgressPath Path to write the progress to - */ - public static void writeToZnode(ZooKeeperExt zk, String myProgressPath) { - byte[] byteArray = WritableUtils.writeToByteArray(get()); - try { - zk.createOrSetExt(myProgressPath, - byteArray, - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT, - true, - -1); - } catch (KeeperException | InterruptedException e) { - if (LOG.isInfoEnabled()) { - LOG.info("writeToZnode: " + e.getClass().getName() + - " exception occurred", e); - } - } - } - - public synchronized boolean isLoadingVerticesDone() { - return loadingVerticesDone; - } - - public synchronized boolean isLoadingEdgesDone() { - return loadingEdgesDone; - } - - public synchronized boolean isComputationDone() { - return computationDone; - } - - public synchronized boolean isStoringDone() { - return storingDone; - } - - /** * Add number of vertices loaded * * @param verticesLoaded How many vertices were loaded since the last @@ -188,8 +140,8 @@ public class WorkerProgress implements Writable { /** * Notify this class that next computation superstep is starting * - * @param superstep Superstep which is starting - * @param verticesToCompute How many vertices are there to compute + * @param superstep Superstep which is starting + * @param verticesToCompute How many vertices are there to compute * @param partitionsToCompute How many partitions are there to compute */ public synchronized void startSuperstep(long superstep, @@ -221,7 +173,7 @@ public class WorkerProgress implements Writable { /** * Notify this class that worker is starting to store data * - * @param verticesToStore How many vertices should be stored + * @param verticesToStore How many vertices should be stored * @param partitionsToStore How many partitions should be stored */ public synchronized void startStoring(long verticesToStore, @@ -260,10 +212,6 @@ public class WorkerProgress implements Writable { storingDone = true; } - public synchronized void setTaskId(int taskId) { - this.taskId = taskId; - } - /** * Update memory info */ @@ -271,58 +219,101 @@ public class WorkerProgress implements Writable { freeMemoryMB = MemoryUtils.freeMemoryMB(); } + @ThriftField(1) public synchronized long getCurrentSuperstep() { return currentSuperstep; } + @ThriftField(2) public synchronized long getVerticesLoaded() { return verticesLoaded; } + @ThriftField(3) public synchronized int getVertexInputSplitsLoaded() { return vertexInputSplitsLoaded; } + @ThriftField(4) + public synchronized boolean isLoadingVerticesDone() { + return loadingVerticesDone; + } + + @ThriftField(5) public synchronized long getEdgesLoaded() { return edgesLoaded; } + @ThriftField(6) public synchronized int getEdgeInputSplitsLoaded() { return edgeInputSplitsLoaded; } + @ThriftField(7) + public synchronized boolean isLoadingEdgesDone() { + return loadingEdgesDone; + } + + @ThriftField(8) public synchronized long getVerticesToCompute() { return verticesToCompute; } + @ThriftField(9) public synchronized long getVerticesComputed() { return verticesComputed; } + @ThriftField(10) public synchronized int getPartitionsToCompute() { return partitionsToCompute; } + @ThriftField(11) public synchronized int getPartitionsComputed() { return partitionsComputed; } + @ThriftField(12) + public synchronized boolean isComputationDone() { + return computationDone; + } + + @ThriftField(13) public synchronized long getVerticesToStore() { return verticesToStore; } + @ThriftField(14) public synchronized long getVerticesStored() { return verticesStored; } + @ThriftField(15) public synchronized int getPartitionsToStore() { return partitionsToStore; } + @ThriftField(16) public synchronized int getPartitionsStored() { return partitionsStored; } + @ThriftField(17) + public synchronized boolean isStoringDone() { + return storingDone; + } + + @ThriftField(18) + public synchronized int getTaskId() { + return taskId; + } + + @ThriftField(19) + public synchronized double getFreeMemoryMB() { + return freeMemoryMB; + } + public synchronized boolean isInputSuperstep() { return currentSuperstep == -1; } @@ -335,69 +326,98 @@ public class WorkerProgress implements Writable { return currentSuperstep == Long.MAX_VALUE; } - public synchronized int getTaskId() { - return taskId; + @ThriftField + public void setCurrentSuperstep(long currentSuperstep) { + this.currentSuperstep = currentSuperstep; } - public synchronized double getFreeMemoryMB() { - return freeMemoryMB; + @ThriftField + public void setVerticesLoaded(long verticesLoaded) { + this.verticesLoaded = verticesLoaded; } - @Override - public synchronized void write(DataOutput dataOutput) throws IOException { - dataOutput.writeLong(currentSuperstep); + @ThriftField + public void setVertexInputSplitsLoaded(int vertexInputSplitsLoaded) { + this.vertexInputSplitsLoaded = vertexInputSplitsLoaded; + } - dataOutput.writeLong(verticesLoaded); - dataOutput.writeInt(vertexInputSplitsLoaded); - dataOutput.writeBoolean(loadingVerticesDone); - dataOutput.writeLong(edgesLoaded); - dataOutput.writeInt(edgeInputSplitsLoaded); - dataOutput.writeBoolean(loadingEdgesDone); + @ThriftField + public void setLoadingVerticesDone(boolean loadingVerticesDone) { + this.loadingVerticesDone = loadingVerticesDone; + } - dataOutput.writeLong(verticesToCompute); - dataOutput.writeLong(verticesComputed); - dataOutput.writeInt(partitionsToCompute); - dataOutput.writeInt(partitionsComputed); + @ThriftField + public void setEdgesLoaded(long edgesLoaded) { + this.edgesLoaded = edgesLoaded; + } - dataOutput.writeBoolean(computationDone); + @ThriftField + public void setEdgeInputSplitsLoaded(int edgeInputSplitsLoaded) { + this.edgeInputSplitsLoaded = edgeInputSplitsLoaded; + } - dataOutput.writeLong(verticesToStore); - dataOutput.writeLong(verticesStored); - dataOutput.writeInt(partitionsToStore); - dataOutput.writeInt(partitionsStored); - dataOutput.writeBoolean(storingDone); + @ThriftField + public void setLoadingEdgesDone(boolean loadingEdgesDone) { + this.loadingEdgesDone = loadingEdgesDone; + } - dataOutput.writeInt(taskId); + @ThriftField + public void setVerticesToCompute(long verticesToCompute) { + this.verticesToCompute = verticesToCompute; + } - dataOutput.writeDouble(freeMemoryMB); + @ThriftField + public void setVerticesComputed(long verticesComputed) { + this.verticesComputed = verticesComputed; } - @Override - public synchronized void readFields(DataInput dataInput) throws IOException { - currentSuperstep = dataInput.readLong(); + @ThriftField + public void setPartitionsToCompute(int partitionsToCompute) { + this.partitionsToCompute = partitionsToCompute; + } - verticesLoaded = dataInput.readLong(); - vertexInputSplitsLoaded = dataInput.readInt(); - loadingVerticesDone = dataInput.readBoolean(); - edgesLoaded = dataInput.readLong(); - edgeInputSplitsLoaded = dataInput.readInt(); - loadingEdgesDone = dataInput.readBoolean(); + @ThriftField + public void setPartitionsComputed(int partitionsComputed) { + this.partitionsComputed = partitionsComputed; + } - verticesToCompute = dataInput.readLong(); - verticesComputed = dataInput.readLong(); - partitionsToCompute = dataInput.readInt(); - partitionsComputed = dataInput.readInt(); + @ThriftField + public void setComputationDone(boolean computationDone) { + this.computationDone = computationDone; + } - computationDone = dataInput.readBoolean(); + @ThriftField + public void setVerticesToStore(long verticesToStore) { + this.verticesToStore = verticesToStore; + } - verticesToStore = dataInput.readLong(); - verticesStored = dataInput.readLong(); - partitionsToStore = dataInput.readInt(); - partitionsStored = dataInput.readInt(); - storingDone = dataInput.readBoolean(); + @ThriftField + public void setVerticesStored(long verticesStored) { + this.verticesStored = verticesStored; + } - taskId = dataInput.readInt(); + @ThriftField + public void setPartitionsToStore(int partitionsToStore) { + this.partitionsToStore = partitionsToStore; + } + + @ThriftField + public void setPartitionsStored(int partitionsStored) { + this.partitionsStored = partitionsStored; + } + + @ThriftField + public void setStoringDone(boolean storingDone) { + this.storingDone = storingDone; + } - freeMemoryMB = dataInput.readDouble(); + @ThriftField + public void setFreeMemoryMB(double freeMemoryMB) { + this.freeMemoryMB = freeMemoryMB; + } + + @ThriftField + public synchronized void setTaskId(int taskId) { + this.taskId = taskId; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java index 4ff5bb1..dae9963 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerProgressWriter.java @@ -18,7 +18,7 @@ package org.apache.giraph.worker; -import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.giraph.job.JobProgressTracker; import org.apache.log4j.Logger; /** @@ -31,33 +31,27 @@ public class WorkerProgressWriter { /** How often to update worker's progress */ private static final int WRITE_UPDATE_PERIOD_MILLISECONDS = 10 * 1000; + /** Job progress tracker */ + private final JobProgressTracker jobProgressTracker; /** Thread which writes worker's progress */ private final Thread writerThread; /** Whether worker finished application */ private volatile boolean finished = false; - /** Path where this worker's progress should be stored */ - private final String myProgressPath; - /** ZooKeeperExt */ - private final ZooKeeperExt zk; /** * Constructor, starts separate thread to periodically update worker's * progress * - * @param myProgressPath Path where this worker's progress should be stored - * @param zk ZooKeeperExt + * @param jobProgressTracker JobProgressTracker to report job progress to */ - public WorkerProgressWriter(String myProgressPath, ZooKeeperExt zk) { - this.myProgressPath = myProgressPath; - this.zk = zk; + public WorkerProgressWriter(JobProgressTracker jobProgressTracker) { + this.jobProgressTracker = jobProgressTracker; writerThread = new Thread(new Runnable() { @Override public void run() { try { while (!finished) { - WorkerProgress.get().updateMemory(); - WorkerProgress.writeToZnode(WorkerProgressWriter.this.zk, - WorkerProgressWriter.this.myProgressPath); + updateAndSendProgress(); double factor = 1 + Math.random(); Thread.sleep((long) (WRITE_UPDATE_PERIOD_MILLISECONDS * factor)); } @@ -73,12 +67,20 @@ public class WorkerProgressWriter { } /** + * Update worker progress and send it + */ + private void updateAndSendProgress() { + WorkerProgress.get().updateMemory(); + jobProgressTracker.updateProgress(WorkerProgress.get()); + } + + /** * Stop the thread which writes worker's progress */ public void stop() throws InterruptedException { finished = true; writerThread.interrupt(); writerThread.join(); - WorkerProgress.writeToZnode(zk, myProgressPath); + updateAndSendProgress(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java index 73ef97b..b5816d7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java @@ -619,13 +619,13 @@ public class ZooKeeperManager { myidWriter = new FileWriter(zkDir + "/myid"); myidWriter.write(i + "\n"); } finally { - Closeables.closeQuietly(myidWriter); + Closeables.close(myidWriter, true); } } } } } finally { - Closeables.closeQuietly(writer); + Closeables.close(writer, true); } } catch (IOException e) { throw new IllegalStateException( http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-core/src/test/java/org/apache/giraph/BspCase.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java index dd2369a..b372ab7 100644 --- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java +++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java @@ -268,8 +268,8 @@ public class BspCase implements Watcher { numResults++; } } finally { - Closeables.closeQuietly(in); - Closeables.closeQuietly(reader); + Closeables.close(in, true); + Closeables.close(reader, true); } } return numResults; http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-examples/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml index f8304a1..f95edcb 100644 --- a/giraph-examples/pom.xml +++ b/giraph-examples/pom.xml @@ -226,14 +226,6 @@ under the License. </plugin> </plugins> </build> - <dependencies> - <dependency> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - <version>${dep.oldnetty.version}</version> - <scope>test</scope> - </dependency> - </dependencies> </profile> <profile> http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java index 5612e5f..488e1ea 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java @@ -461,8 +461,8 @@ public class assertEquals(numVertices, (long) vertexCounts.get(maxSuperstep)); } finally { - Closeables.closeQuietly(in); - Closeables.closeQuietly(reader); + Closeables.close(in, true); + Closeables.close(reader, true); } } } finally { http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java index 7ae8bc3..517901a 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java @@ -176,7 +176,7 @@ public class HiveJythonUtils { jythonJob = parseJythonStreams(interpreter, streams); } finally { for (InputStream stream : streams) { - Closeables.closeQuietly(stream); + Closeables.close(stream, true); } } return jythonJob; http://git-wip-us.apache.org/repos/asf/giraph/blob/4485e563/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b4d78ae..2e3eb63 100644 --- a/pom.xml +++ b/pom.xml @@ -289,6 +289,7 @@ under the License. <dep.commons-logging.version>1.1.1</dep.commons-logging.version> <dep.commons-io.version>2.1</dep.commons-io.version> <dep.commons-net.version>3.1</dep.commons-net.version> + <dep.facebook-swift.version>0.13.1</dep.facebook-swift.version> <dep.fasterxml-jackson.version>2.1.2</dep.fasterxml-jackson.version> <dep.fastutil.version>6.5.4</dep.fastutil.version> <dep.google.findbugs.version>2.0.2</dep.google.findbugs.version> @@ -309,7 +310,7 @@ under the License. <!-- note: old version of netty is required by hadoop_facebook for tests to succeed --> <dep.oldnetty.version>3.2.2.Final</dep.oldnetty.version> <dep.netty.version>4.0.14.Final</dep.netty.version> - <dep.paranamer.version>2.3</dep.paranamer.version> + <dep.paranamer.version>2.5.2</dep.paranamer.version> <dep.slf4j.version>1.7.5</dep.slf4j.version> <dep.tinkerpop.rexter.version>2.4.0</dep.tinkerpop.rexter.version> <dep.typetools.version>0.2.1</dep.typetools.version> @@ -1389,6 +1390,14 @@ under the License. <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils-core</artifactId> </exclusion> + <exclusion> + <groupId>io.airlift</groupId> + <artifactId>units</artifactId> + </exclusion> + <exclusion> + <groupId>javax.validation</groupId> + <artifactId>validation-api</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -1404,6 +1413,14 @@ under the License. <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> + <exclusion> + <groupId>io.airlift</groupId> + <artifactId>units</artifactId> + </exclusion> + <exclusion> + <groupId>javax.validation</groupId> + <artifactId>validation-api</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -1594,6 +1611,60 @@ under the License. <version>${dep.yourkit-api.version}</version> </dependency> <dependency> + <groupId>com.facebook.nifty</groupId> + <artifactId>nifty-client</artifactId> + <version>${dep.facebook-swift.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.facebook.swift</groupId> + <artifactId>swift-codec</artifactId> + <version>${dep.facebook-swift.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm-all</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.facebook.swift</groupId> + <artifactId>swift-annotations</artifactId> + <version>${dep.facebook-swift.version}</version> + </dependency> + <dependency> + <groupId>com.facebook.swift</groupId> + <artifactId>swift-service</artifactId> + <version>${dep.facebook-swift.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${dep.log4j.version}</version> @@ -2086,6 +2157,14 @@ under the License. <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils-core</artifactId> </exclusion> + <exclusion> + <groupId>io.airlift</groupId> + <artifactId>units</artifactId> + </exclusion> + <exclusion> + <groupId>javax.validation</groupId> + <artifactId>validation-api</artifactId> + </exclusion> </exclusions> </dependency> <dependency>
