Repository: giraph
Updated Branches:
  refs/heads/trunk 833ccb78e -> c4cb42f7d


GIRAPH-1114: Expose StatusReporter from workers in blocks framework

Summary: Sometimes we need to call progress or update status from workers, 
expose this functionality

Test Plan: verify

Differential Revision: https://reviews.facebook.net/D63999


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c4cb42f7
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c4cb42f7
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c4cb42f7

Branch: refs/heads/trunk
Commit: c4cb42f7d77138beb79d66c70c8b42448078d992
Parents: 833ccb7
Author: Maja Kabiljo <majakabi...@fb.com>
Authored: Wed Sep 14 16:35:22 2016 -0700
Committer: Maja Kabiljo <majakabi...@fb.com>
Committed: Mon Sep 19 12:58:30 2016 -0700

----------------------------------------------------------------------
 .../block_app/framework/api/BlockWorkerApi.java |  3 +-
 .../framework/api/BlockWorkerContextApi.java    |  2 +-
 .../api/giraph/BlockMasterApiWrapper.java       | 15 ++--------
 .../api/giraph/BlockWorkerApiWrapper.java       | 17 +++++++++++
 .../giraph/BlockWorkerContextApiWrapper.java    | 17 +++++++++++
 .../framework/api/local/InternalApi.java        | 23 +++++++++------
 .../framework/internal/BlockCounters.java       | 30 ++++++++++++++++++++
 7 files changed, 84 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c4cb42f7/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
index f6b3551..2e0aea0 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java
@@ -30,7 +30,8 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public interface BlockWorkerApi<I extends WritableComparable>
-    extends BlockApi, BlockOutputApi, AggregatorUsage, WorkerIndexUsage<I> {
+    extends BlockApi, BlockOutputApi, AggregatorUsage, WorkerIndexUsage<I>,
+    StatusReporter {
   @Override
   ImmutableClassesGiraphConfiguration<I, ?, ?> getConf();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c4cb42f7/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java
index 699f76a..1dee917 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java
@@ -29,5 +29,5 @@ import org.apache.hadoop.io.WritableComparable;
  */
 @SuppressWarnings("rawtypes")
 public interface BlockWorkerContextApi<I extends WritableComparable>
-    extends AggregatorUsage, BlockApi, WorkerIndexUsage<I> {
+    extends AggregatorUsage, BlockApi, WorkerIndexUsage<I>, StatusReporter {
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c4cb42f7/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
index 55d3693..3b959a8 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java
@@ -22,6 +22,7 @@ import 
org.apache.giraph.block_app.framework.api.BlockMasterApi;
 import org.apache.giraph.block_app.framework.api.BlockOutputApi;
 import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor;
 import org.apache.giraph.block_app.framework.api.Counter;
+import org.apache.giraph.block_app.framework.internal.BlockCounters;
 import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
 import org.apache.giraph.block_app.framework.output.BlockOutputHandle;
 import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
@@ -64,19 +65,7 @@ final class BlockMasterApiWrapper implements BlockMasterApi,
 
   @Override
   public Counter getCounter(String group, String name) {
-    final org.apache.hadoop.mapreduce.Counter counter =
-        master.getContext().getCounter(group, name);
-    return new Counter() {
-      @Override
-      public void increment(long incr) {
-        counter.increment(incr);
-      }
-
-      @Override
-      public void setValue(long value) {
-        counter.setValue(value);
-      }
-    };
+    return BlockCounters.getCounter(master.getContext(), group, name);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/c4cb42f7/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
index 7bea80a..9a2b398 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java
@@ -24,6 +24,8 @@ import 
org.apache.giraph.block_app.framework.api.BlockOutputApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
+import org.apache.giraph.block_app.framework.api.Counter;
+import org.apache.giraph.block_app.framework.internal.BlockCounters;
 import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
 import org.apache.giraph.block_app.framework.output.BlockOutputWriter;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -192,4 +194,19 @@ final class BlockWorkerApiWrapper<I extends 
WritableComparable,
   public int getWorkerForVertex(I vertexId) {
     return worker.getWorkerForVertex(vertexId);
   }
+
+  @Override
+  public Counter getCounter(String group, String name) {
+    return BlockCounters.getCounter(worker.getContext(), group, name);
+  }
+
+  @Override
+  public void progress() {
+    worker.getContext().progress();
+  }
+
+  @Override
+  public void setStatus(String status) {
+    worker.getContext().setStatus(status);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c4cb42f7/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
index 7336bd4..744566b 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java
@@ -19,6 +19,8 @@ package org.apache.giraph.block_app.framework.api.giraph;
 
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi;
 import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi;
+import org.apache.giraph.block_app.framework.api.Counter;
+import org.apache.giraph.block_app.framework.internal.BlockCounters;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.io.Writable;
@@ -89,4 +91,19 @@ final class BlockWorkerContextApiWrapper
   public long getTotalNumVertices() {
     return workerContext.getTotalNumVertices();
   }
+
+  @Override
+  public Counter getCounter(String group, String name) {
+    return BlockCounters.getCounter(workerContext.getContext(), group, name);
+  }
+
+  @Override
+  public void progress() {
+    workerContext.getContext().progress();
+  }
+
+  @Override
+  public void setStatus(String status) {
+    workerContext.getContext().setStatus(status);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c4cb42f7/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
index a8d5ef7..56adc47 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
@@ -45,6 +45,7 @@ import 
org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor;
 import org.apache.giraph.block_app.framework.api.Counter;
 import 
org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalChecksMessageStore;
 import 
org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalWrappedMessageStore;
+import org.apache.giraph.block_app.framework.internal.BlockCounters;
 import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic;
 import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces;
 import org.apache.giraph.block_app.framework.output.BlockOutputDesc;
@@ -243,6 +244,19 @@ class InternalApi<I extends WritableComparable, V extends 
Writable,
     public <OW extends BlockOutputWriter> OW getWriter(String confOption) {
       return workerContextLogic.getOutputHandle().getWriter(confOption);
     }
+
+    @Override
+    public void setStatus(String status) {
+    }
+
+    @Override
+    public void progress() {
+    }
+
+    @Override
+    public Counter getCounter(final String group, final String name) {
+      return BlockCounters.getNoOpCounter();
+    }
   }
 
   @Override
@@ -316,14 +330,7 @@ class InternalApi<I extends WritableComparable, V extends 
Writable,
 
   @Override
   public Counter getCounter(final String group, final String name) {
-    return new Counter() {
-      @Override
-      public void increment(long incr) {
-      }
-      @Override
-      public void setValue(long value) {
-      }
-    };
+    return BlockCounters.getNoOpCounter();
   }
 
   private VertexMutations<I, V, E> getMutationFor(I vertexId) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/c4cb42f7/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java
 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java
index da28537..8a0881f 100644
--- 
a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java
+++ 
b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java
@@ -19,9 +19,12 @@ package org.apache.giraph.block_app.framework.internal;
 
 import java.lang.reflect.Field;
 
+import org.apache.giraph.block_app.framework.api.Counter;
 import org.apache.giraph.block_app.framework.api.StatusReporter;
 import 
org.apache.giraph.block_app.framework.internal.BlockMasterLogic.TimeStatsPerEvent;
 
+import org.apache.hadoop.mapreduce.Mapper;
+
 /** Utility class for Blocks Framework related counters */
 public class BlockCounters {
   public static final String GROUP = "Blocks Framework";
@@ -83,4 +86,31 @@ public class BlockCounters {
     ).setValue(millis / 1000);
     timeStats.inc(name, millis);
   }
+
+  public static Counter getCounter(
+      Mapper.Context context, String group, String name) {
+    final org.apache.hadoop.mapreduce.Counter counter =
+        context.getCounter(group, name);
+    return new Counter() {
+      @Override
+      public void increment(long incr) {
+        counter.increment(incr);
+      }
+
+      @Override
+      public void setValue(long value) {
+        counter.setValue(value);
+      }
+    };
+  }
+
+  public static Counter getNoOpCounter() {
+    return new Counter() {
+      @Override
+      public void setValue(long value) { }
+
+      @Override
+      public void increment(long incr) { }
+    };
+  }
 }

Reply via email to