Updated Branches: refs/heads/trunk e709fda6b -> 73d70bd89
GIRAPH-565: Make an easy way to gather some logs from workers on master (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/73d70bd8 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/73d70bd8 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/73d70bd8 Branch: refs/heads/trunk Commit: 73d70bd8913a0fb8465138b1b3db2e2f384e7fe5 Parents: e709fda Author: Maja Kabiljo <[email protected]> Authored: Fri Mar 15 14:40:25 2013 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Fri Mar 15 14:40:25 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/aggregators/TextAppendAggregator.java | 37 ++++ .../giraph/master/MasterAggregatorHandler.java | 5 +- .../giraph/utils/MasterLoggingAggregator.java | 143 +++++++++++++++ 4 files changed, 186 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/73d70bd8/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index ebdb6df..38350fa 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-565: Make an easy way to gather some logs from workers on master (majakabiljo) + GIRAPH-559: use hive-io-experimental release (nitay) GIRAPH-562: Implement getConf/setConf in AbstractHive classes (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/73d70bd8/giraph-core/src/main/java/org/apache/giraph/aggregators/TextAppendAggregator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/TextAppendAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/TextAppendAggregator.java new file mode 100644 index 0000000..b94fb60 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/TextAppendAggregator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.aggregators; + +import org.apache.hadoop.io.Text; + +/** + * Aggregator with {@link Text} as its value which keeps appending text to it + */ +public class TextAppendAggregator extends BasicAggregator<Text> { + @Override + public void aggregate(Text value) { + byte[] valueBytes = value.getBytes(); + getAggregatedValue().append(valueBytes, 0, valueBytes.length); + } + + @Override + public Text createInitialValue() { + return new Text(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/73d70bd8/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java index f5400d2..f769c3a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java @@ -26,6 +26,7 @@ import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.aggregators.AggregatorWrapper; import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.bsp.BspService; +import org.apache.giraph.utils.MasterLoggingAggregator; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; @@ -72,6 +73,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, this.conf = conf; this.progressable = progressable; aggregatorWriter = conf.createAggregatorWriter(); + MasterLoggingAggregator.registerAggregator(this, conf); } @Override @@ -160,7 +162,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, */ public void prepareSuperstep(MasterClient masterClient) { if (LOG.isDebugEnabled()) { - LOG.debug("prepareSuperstep: Start preapring aggregators"); + LOG.debug("prepareSuperstep: Start preparing aggregators"); } // prepare aggregators for master compute for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) { @@ -172,6 +174,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, aggregator.resetCurrentAggregator(); progressable.progress(); } + MasterLoggingAggregator.logAggregatedValue(this, conf); if (LOG.isDebugEnabled()) { LOG.debug("prepareSuperstep: Aggregators prepared"); } http://git-wip-us.apache.org/repos/asf/giraph/blob/73d70bd8/giraph-core/src/main/java/org/apache/giraph/utils/MasterLoggingAggregator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/MasterLoggingAggregator.java b/giraph-core/src/main/java/org/apache/giraph/utils/MasterLoggingAggregator.java new file mode 100644 index 0000000..f438408 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/MasterLoggingAggregator.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import org.apache.giraph.aggregators.TextAppendAggregator; +import org.apache.giraph.master.MasterAggregatorUsage; +import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +/** + * Helper class for using aggregator which gathers log messages from workers + * and prints them on master. + * + * If you want to track what's going on in your application, + * and want to have all those logs accessible in a single place in the end of + * each superstep, you can use option from this class. + * + * If you use a lot of log messages this might slow down your application, + * but it can easily be turned on/off without changing your code just by + * switching the option. + */ +public class MasterLoggingAggregator { + /** Whether or not to use master logging aggregator */ + public static final String USE_MASTER_LOGGING_AGGREGATOR = + "giraph.useMasterLoggingAggregator"; + /** Default is not using master logging aggregator */ + public static final boolean USE_MASTER_LOGGING_AGGREGATOR_DEFAULT = false; + /** Name of aggregator which will be gathering the logs */ + public static final String MASTER_LOGGING_AGGREGATOR_NAME = + "masterLoggingAggregator"; + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(MasterLoggingAggregator.class); + + /** Do not instantiate */ + private MasterLoggingAggregator() { + } + + /** + * Check if master logging aggregator is used. + * + * @param conf Configuration + * @return True iff master logging aggregator is used + */ + public static boolean useMasterLoggingAggregator(Configuration conf) { + return conf.getBoolean(USE_MASTER_LOGGING_AGGREGATOR, + USE_MASTER_LOGGING_AGGREGATOR_DEFAULT); + } + + /** + * Set whether or not master logging aggregator should be used + * + * @param useMasterLoggingAggregator Whether or not we want + * master logging aggregator to be used + * @param conf Configuration + */ + public static void setUseMasterLoggingAggregator( + boolean useMasterLoggingAggregator, Configuration conf) { + conf.setBoolean(USE_MASTER_LOGGING_AGGREGATOR, useMasterLoggingAggregator); + } + + /** + * Aggregate some message to master logging aggregator, + * if the option for using it is set in the configuration. + * + * This is the method application implementation should use + * in order to add message to the aggregator. + * + * @param message Message to log + * @param workerAggregatorUsage Worker aggregator usage + * (can be Vertex, WorkerContext, etc) + * @param conf Configuration + */ + public static void aggregate(String message, + WorkerAggregatorUsage workerAggregatorUsage, Configuration conf) { + if (useMasterLoggingAggregator(conf)) { + workerAggregatorUsage.aggregate( + MASTER_LOGGING_AGGREGATOR_NAME, new Text(message)); + } + } + + /** + * Register master logging aggregator, + * if the option for using it is set in the configuration. + * + * This method will be called by Giraph infrastructure on master. + * + * @param masterAggregatorUsage Master aggregator usage + * @param conf Configuration + */ + public static void registerAggregator( + MasterAggregatorUsage masterAggregatorUsage, Configuration conf) { + if (useMasterLoggingAggregator(conf)) { + try { + masterAggregatorUsage.registerAggregator(MASTER_LOGGING_AGGREGATOR_NAME, + TextAppendAggregator.class); + } catch (InstantiationException e) { + throw new IllegalStateException("registerAggregator: " + + "InstantiationException occurred"); + } catch (IllegalAccessException e) { + throw new IllegalStateException("registerAggregator: " + + "IllegalAccessException occurred"); + } + } + } + + /** + * Print value of master logging aggregator on the master log, + * if the option for using it is set in the configuration. + * + * This method will be called by Giraph infrastructure on master. + * + * @param masterAggregatorUsage Master aggregator usage + * @param conf Configuration + */ + public static void logAggregatedValue( + MasterAggregatorUsage masterAggregatorUsage, Configuration conf) { + if (useMasterLoggingAggregator(conf) && LOG.isInfoEnabled()) { + LOG.info("logAggregatedValue: \n" + + masterAggregatorUsage.getAggregatedValue( + MASTER_LOGGING_AGGREGATOR_NAME)); + } + } +}
