Repository: giraph Updated Branches: refs/heads/trunk d86d0d56e -> 8e6ec2661
GIRAPH-1174 closes #62 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8e6ec266 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8e6ec266 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8e6ec266 Branch: refs/heads/trunk Commit: 8e6ec266190d59558f369f808c53cf9186887ce5 Parents: d86d0d5 Author: Maja Kabiljo <majakabi...@fb.com> Authored: Fri Mar 9 14:03:41 2018 -0800 Committer: Maja Kabiljo <majakabi...@fb.com> Committed: Tue Mar 13 16:29:02 2018 -0700 ---------------------------------------------------------------------- .../apache/giraph/conf/GiraphConfiguration.java | 20 ++++++++++++ .../org/apache/giraph/conf/GiraphConstants.java | 5 +++ .../ImmutableClassesGiraphConfiguration.java | 17 ++++++++++ .../apache/giraph/graph/GraphTaskManager.java | 5 +++ .../org/apache/giraph/utils/GcObserver.java | 33 ++++++++++++++++++++ 5 files changed, 80 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/8e6ec266/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index ffed2e0..e269de4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -51,6 +51,7 @@ import org.apache.giraph.master.MasterObserver; import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.ReusesObjectsPartition; +import org.apache.giraph.utils.GcObserver; import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerObserver; @@ -324,6 +325,16 @@ public class GiraphConfiguration extends Configuration } /** + * Add a GcObserver class (optional) + * + * @param gcObserverClass GcObserver class to add. + */ + public final void addGcObserverClass( + Class<? extends GcObserver> gcObserverClass) { + GC_OBSERVER_CLASSES.add(this, gcObserverClass); + } + + /** * Get job observer class * * @return GiraphJobObserver class set. @@ -707,6 +718,15 @@ public class GiraphConfiguration extends Configuration } /** + * Get array of GcObserver classes set in configuration. + * + * @return array of GcObserver classes. + */ + public Class<? extends GcObserver>[] getGcObserverClasses() { + return GC_OBSERVER_CLASSES.getArray(this); + } + + /** * Whether to track, print, and aggregate metrics. * * @return true if metrics are enabled, false otherwise (default) http://git-wip-us.apache.org/repos/asf/giraph/blob/8e6ec266/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 44b2a44..db13670 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -84,6 +84,7 @@ import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.partition.HashPartitionerFactory; import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.SimplePartition; +import org.apache.giraph.utils.GcObserver; import org.apache.giraph.worker.DefaultWorkerContext; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerObserver; @@ -229,6 +230,10 @@ public interface GiraphConstants { ClassConfOption<MapperObserver> MAPPER_OBSERVER_CLASSES = ClassConfOption.create("giraph.mapper.observers", null, MapperObserver.class, "Classes for Mapper Observer - optional"); + /** Classes for GC Observer - optional */ + ClassConfOption<GcObserver> GC_OBSERVER_CLASSES = + ClassConfOption.create("giraph.gc.observers", null, + GcObserver.class, "Classes for GC oObserver - optional"); /** Message combiner class - optional */ ClassConfOption<MessageCombiner> MESSAGE_COMBINER_CLASS = ClassConfOption.create("giraph.messageCombinerClass", null, http://git-wip-us.apache.org/repos/asf/giraph/blob/8e6ec266/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java index 680368b..dfd24d0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java @@ -76,6 +76,7 @@ import org.apache.giraph.utils.ExtendedByteArrayDataInput; import org.apache.giraph.utils.ExtendedByteArrayDataOutput; import org.apache.giraph.utils.ExtendedDataInput; import org.apache.giraph.utils.ExtendedDataOutput; +import org.apache.giraph.utils.GcObserver; import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.utils.UnsafeByteArrayInputStream; import org.apache.giraph.utils.UnsafeByteArrayOutputStream; @@ -787,6 +788,22 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable, } /** + * Create array of GcObservers. + * + * @param context Mapper context + * @return Instantiated array of GcObservers. + */ + public GcObserver[] createGcObservers( + Mapper<?, ?, ?, ?>.Context context) { + Class<? extends GcObserver>[] klasses = getGcObserverClasses(); + GcObserver[] objects = new GcObserver[klasses.length]; + for (int i = 0; i < klasses.length; ++i) { + objects[i] = ReflectionUtils.newInstance(klasses[i], this, context); + } + return objects; + } + + /** * Create job observer * * @return GiraphJobObserver set in configuration. http://git-wip-us.apache.org/repos/asf/giraph/blob/8e6ec266/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index b0659bf..08b45a2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -58,6 +58,7 @@ import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.partition.PartitionStore; import org.apache.giraph.scripting.ScriptLoader; import org.apache.giraph.utils.CallableFactory; +import org.apache.giraph.utils.GcObserver; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.ProgressableUtils; import org.apache.giraph.worker.BspServiceWorker; @@ -650,6 +651,7 @@ end[PURE_YARN]*/ * notifies an out-of-core engine (if any is used) about the GC. */ private void installGCMonitoring() { + final GcObserver[] gcObservers = conf.createGcObservers(context); List<GarbageCollectorMXBean> mxBeans = ManagementFactory .getGarbageCollectorMXBeans(); final OutOfCoreEngine oocEngine = @@ -674,6 +676,9 @@ end[PURE_YARN]*/ } gcTimeMetric.inc(info.getGcInfo().getDuration()); GiraphMetrics.get().getGcTracker().gcOccurred(info.getGcInfo()); + for (GcObserver gcObserver : gcObservers) { + gcObserver.gcOccurred(info); + } if (oocEngine != null) { oocEngine.gcCompleted(info); } http://git-wip-us.apache.org/repos/asf/giraph/blob/8e6ec266/giraph-core/src/main/java/org/apache/giraph/utils/GcObserver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/GcObserver.java b/giraph-core/src/main/java/org/apache/giraph/utils/GcObserver.java new file mode 100644 index 0000000..3337e42 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/GcObserver.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import com.sun.management.GarbageCollectionNotificationInfo; + +/** + * Observer for when GCs occur + */ +public interface GcObserver { + /** + * Called to notify that GC occurred + * + * @param gcInfo GC info + */ + void gcOccurred(GarbageCollectionNotificationInfo gcInfo); +}