Repository: tez
Updated Branches:
  refs/heads/master 842abc17e -> 170af76fe


TEZ-1311. get sharedobjectregistry from the context instead of a static (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/170af76f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/170af76f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/170af76f

Branch: refs/heads/master
Commit: 170af76fee3cefe524ae651c00389ade647dd698
Parents: 842abc1
Author: Bikas Saha <[email protected]>
Authored: Fri Jul 25 11:21:00 2014 -0700
Committer: Bikas Saha <[email protected]>
Committed: Fri Jul 25 11:21:00 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 pom.xml                                         |  5 ---
 tez-api/pom.xml                                 |  4 --
 .../apache/tez/runtime/api/TezTaskContext.java  |  9 ++++
 .../objectregistry/ObjectRegistryFactory.java   | 32 ---------------
 tez-dag/pom.xml                                 |  4 --
 .../org/apache/tez/runtime/task/TezChild.java   |  8 +---
 .../apache/tez/runtime/task/TezTaskRunner.java  |  8 ++--
 .../tez/runtime/task/TestTaskExecution.java     |  2 +-
 .../examples/BroadcastAndOneToOneExample.java   |  5 +--
 .../tez/mapreduce/examples/MRRSleepJob.java     | 15 -------
 .../tez/mapreduce/processor/MapUtils.java       |  2 +-
 .../processor/reduce/TestReduceProcessor.java   |  2 +-
 tez-runtime-internals/pom.xml                   |  4 --
 .../runtime/LogicalIOProcessorRuntimeTask.java  | 12 ++++--
 .../runtime/api/impl/TezInputContextImpl.java   |  8 +++-
 .../runtime/api/impl/TezOutputContextImpl.java  |  8 +++-
 .../api/impl/TezProcessorContextImpl.java       |  8 +++-
 .../runtime/api/impl/TezTaskContextImpl.java    | 14 ++++++-
 .../objectregistry/ObjectRegistryModule.java    | 43 --------------------
 .../TestLogicalIOProcessorRuntimeTask.java      |  4 +-
 .../objectregistry/TestObjectRegistry.java      | 15 +------
 .../output/TestOnFileUnorderedKVOutput.java     |  2 +-
 23 files changed, 64 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5ebb28d..699b291 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -32,6 +32,8 @@ INCOMPATIBLE CHANGES
   the input and output (bikas)
   TEZ-1312. rename vertex.addInput/Output() to vertex.addDataSource/Sink()
   (Chen He via bikas)
+  TEZ-1311. get sharedobjectregistry from the context instead of a static
+  (bikas)"
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 83d5db6..0d44b09 100644
--- a/pom.xml
+++ b/pom.xml
@@ -575,11 +575,6 @@
        <version>${protobuf.version}</version>
       </dependency>
       <dependency>
-        <groupId>com.google.inject</groupId>
-        <artifactId>guice</artifactId>
-        <version>3.0</version>
-      </dependency>
-      <dependency>
         <groupId>org.xerial.snappy</groupId>
         <artifactId>snappy-java</artifactId>
         <version>1.0.4.1</version>

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-api/pom.xml b/tez-api/pom.xml
index 0048762..e3c4e8d 100644
--- a/tez-api/pom.xml
+++ b/tez-api/pom.xml
@@ -46,10 +46,6 @@
       <artifactId>commons-lang</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.google.inject</groupId>
-      <artifactId>guice</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java 
b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
index 04ba05f..7b93c65 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java
@@ -22,8 +22,10 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import javax.annotation.Nullable;
+
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 
 /**
  * Base interface for Context classes used to initialize the Input, Output
@@ -107,6 +109,13 @@ public interface TezTaskContext {
    * @return a unique identifier
    */
   public String getUniqueIdentifier();
+  
+  /**
+   * Returns a shared {@link ObjectRegistry} to hold user objects in memory 
+   * between tasks. 
+   * @return {@link ObjectRegistry}
+   */
+  public ObjectRegistry getObjectRegistry();
 
   /**
    * Report a fatal error to the framework. This will cause the entire task to

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-api/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryFactory.java
----------------------------------------------------------------------
diff --git 
a/tez-api/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryFactory.java
 
b/tez-api/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryFactory.java
deleted file mode 100644
index 3e409cd..0000000
--- 
a/tez-api/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.common.objectregistry;
-
-import com.google.inject.Inject;
-
-public class ObjectRegistryFactory {
-
-  @Inject
-  private static ObjectRegistry objectRegistry;
-
-  public static ObjectRegistry getObjectRegistry() {
-    return objectRegistry;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index d7106c3..6a0da79 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -118,10 +118,6 @@
       <artifactId>junit</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.google.inject</groupId>
-      <artifactId>guice</artifactId>
-    </dependency>
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java 
b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 8c23722..131def0 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -62,7 +62,6 @@ import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 
 import com.google.common.base.Function;
@@ -74,8 +73,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
 
 public class TezChild {
 
@@ -207,7 +204,7 @@ public class TezChild {
         // Execute the Actual Task
         TezTaskRunner taskRunner = new TezTaskRunner(new 
TezConfiguration(defaultConf), childUGI,
             localDirs, containerTask.getTaskSpec(), umbilical, 
appAttemptNumber,
-            serviceConsumerMetadata, startedInputsMap, taskReporter, executor);
+            serviceConsumerMetadata, startedInputsMap, taskReporter, executor, 
objectRegistry);
         boolean shouldDie = false;
         try {
           shouldDie = !taskRunner.run();
@@ -355,9 +352,8 @@ public class TezChild {
     // of this class. Leaving it here, till there's some entity representing a 
running JVM.
     DefaultMetricsSystem.initialize("TezTask");
 
+    // singleton of ObjectRegistry for this JVM
     ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
-    @SuppressWarnings("unused")
-    Injector injector = Guice.createInjector(new 
ObjectRegistryModule(objectRegistry));
 
     TezChild tezChild = new TezChild(defaultConf, host, port, 
containerIdentifier, tokenIdentifier,
         attemptNumber, localDirs, objectRegistry);

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java 
b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index fb770e6..f1ddc0b 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Logger;
-import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -43,6 +42,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -70,16 +70,16 @@ public class TezTaskRunner implements TezUmbilical, 
ErrorReporter {
   TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] 
localDirs,
       TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int 
appAttemptNumber,
       Map<String, ByteBuffer> serviceConsumerMetadata, Multimap<String, 
String> startedInputsMap,
-      TaskReporter taskReporter, ListeningExecutorService executor) throws 
IOException {
+      TaskReporter taskReporter, ListeningExecutorService executor, 
ObjectRegistry objectRegistry) 
+          throws IOException {
     this.tezConf = tezConf;
     this.ugi = ugi;
     this.taskReporter = taskReporter;
     this.executor = executor;
     task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, 
tezConf, localDirs, this,
-        serviceConsumerMetadata, startedInputsMap);
+        serviceConsumerMetadata, startedInputsMap, objectRegistry);
     taskReporter.registerTask(task, this);
     taskRunning = new AtomicBoolean(true);
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java 
b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
index 0d1e509..98abf1c 100644
--- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -649,7 +649,7 @@ public class TestTaskExecution {
 
     TezTaskRunner taskRunner = new TezTaskRunner(tezConf, ugi, localDirs, 
taskSpec, umbilical, 1,
         new HashMap<String, ByteBuffer>(), HashMultimap.<String, String> 
create(), taskReporter,
-        executor);
+        executor, null);
     return taskRunner;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index e476a88..7e7d351 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -48,7 +48,6 @@ import 
org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 import 
org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer;
@@ -72,7 +71,7 @@ public class BroadcastAndOneToOneExample extends Configured 
implements Tool {
       if (userPayload != null) {
         boolean doLocalityCheck = userPayload[0] > 0 ? true : false;
         if (doLocalityCheck) {
-          ObjectRegistry objectRegistry = 
ObjectRegistryFactory.getObjectRegistry();
+          ObjectRegistry objectRegistry = getContext().getObjectRegistry();
           String entry = String.valueOf(getContext().getTaskIndex());
           objectRegistry.add(ObjectLifeCycle.DAG, entry, entry);
         }
@@ -104,7 +103,7 @@ public class BroadcastAndOneToOneExample extends Configured 
implements Tool {
       Preconditions.checkState((sum == expectedSum), "Sum = " + sum);      
       
       if (doLocalityCheck) {
-        ObjectRegistry objectRegistry = 
ObjectRegistryFactory.getObjectRegistry();
+        ObjectRegistry objectRegistry = getContext().getObjectRegistry();
         String index = (String) 
objectRegistry.get(String.valueOf(getContext().getTaskIndex()));
         if (index == null || Integer.valueOf(index).intValue() != 
getContext().getTaskIndex()) {
           String msg = "Did not find expected local producer "

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 14fe441..ec49a1c 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -79,9 +79,6 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -222,18 +219,6 @@ public class MRRSleepJob extends Configured implements 
Tool {
 
       TaskAttemptID taId = context.getTaskAttemptID();
 
-      ObjectRegistry objectRegistry = 
ObjectRegistryFactory.getObjectRegistry();
-      String fooBarVal = (String) objectRegistry.get("FooBar");
-      if (null == fooBarVal) {
-        LOG.info("Adding FooBar key to Object cache");
-        objectRegistry.add(ObjectLifeCycle.DAG,
-            "FooBar", "BarFooFromTask" + taId.getTaskID().toString());
-      } else {
-        LOG.info("Got FooBar val from Object cache"
-            + ", currentTaskId=" + taId.getTaskID().toString()
-            + ", val=" + fooBarVal);
-      }
-
       String[] taskIds = conf.getStrings(MAP_ERROR_TASK_IDS);
       if (taId.getId()+1 >= context.getMaxMapAttempts()) {
         finalAttempt = true;

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 69d6cb7..3731c64 100644
--- 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -222,7 +222,7 @@ public class MapUtils {
         new String[] {workDir.toString()},
         umbilical,
         serviceConsumerMetadata,
-        HashMultimap.<String, String>create());
+        HashMultimap.<String, String>create(), null);
     return task;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index c1798d3..5ea6eb5 100644
--- 
a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ 
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -184,7 +184,7 @@ public class TestReduceProcessor {
         new String[] {workDir.toString()},
         new TestUmbilical(),
         serviceConsumerMetadata,
-        HashMultimap.<String, String>create());
+        HashMultimap.<String, String>create(), null);
     
     task.initialize();
     task.run();

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml
index f106997..779bb02 100644
--- a/tez-runtime-internals/pom.xml
+++ b/tez-runtime-internals/pom.xml
@@ -50,10 +50,6 @@
       <artifactId>hadoop-yarn-common</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.google.inject</groupId>
-      <artifactId>guice</artifactId>
-    </dependency>
-    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 52bc0a6..a72dafa 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -74,6 +74,7 @@ import 
org.apache.tez.runtime.api.impl.TezMergedInputContextImpl;
 import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
 import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -125,13 +126,15 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
   private final int appAttemptNumber;
 
   private final InputReadyTracker inputReadyTracker;
+  
+  private final ObjectRegistry objectRegistry;
 
   // KKK Make sure LogicalInputFramework checks are in place
 
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber,
       Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical,
       Map<String, ByteBuffer> serviceConsumerMetadata,
-      Multimap<String, String> startedInputsMap) throws IOException {
+      Multimap<String, String> startedInputsMap, ObjectRegistry 
objectRegistry) throws IOException {
     // TODO Remove jobToken from here post TEZ-421
     super(taskSpec, tezConf, tezUmbilical);
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
@@ -167,6 +170,7 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
     initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, 
tezConf);
     this.startedInputsMap = startedInputsMap;
     this.inputReadyTracker = new InputReadyTracker();
+    this.objectRegistry = objectRegistry;
   }
 
   /**
@@ -479,7 +483,7 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
         tezCounters, inputIndex,
         inputSpec.getInputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
-        inputSpec.getInputDescriptor(), input, inputReadyTracker);
+        inputSpec.getInputDescriptor(), input, inputReadyTracker, 
objectRegistry);
     return inputContext;
   }
 
@@ -491,7 +495,7 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
         tezCounters, outputIndex,
         outputSpec.getOutputDescriptor().getUserPayload(), this,
         serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
-        outputSpec.getOutputDescriptor());
+        outputSpec.getOutputDescriptor(), objectRegistry);
     return outputContext;
   }
 
@@ -502,7 +506,7 @@ public class LogicalIOProcessorRuntimeTask extends 
RuntimeTask {
         taskSpec.getTaskAttemptID(),
         tezCounters, processorDescriptor.getUserPayload(), this,
         serviceConsumerMetadata, System.getenv(), initialMemoryDistributor,
-        processorDescriptor, inputReadyTracker);
+        processorDescriptor, inputReadyTracker, objectRegistry);
     return processorContext;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index fed36ab..e98d694 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -19,13 +19,16 @@
 package org.apache.tez.runtime.api.impl;
 
 import com.google.common.base.Preconditions;
+
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import javax.annotation.Nullable;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUserPayload;
@@ -40,6 +43,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 public class TezInputContextImpl extends TezTaskContextImpl
@@ -59,11 +63,11 @@ public class TezInputContextImpl extends TezTaskContextImpl
       TezCounters counters, int inputIndex, @Nullable byte[] userPayload,
       RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
-      InputDescriptor inputDescriptor,  Input input, InputReadyTracker 
inputReadyTracker) {
+      InputDescriptor inputDescriptor,  Input input, InputReadyTracker 
inputReadyTracker, ObjectRegistry objectRegistry) {
     super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, 
taskAttemptID,
         wrapCounters(counters, taskVertexName, sourceVertexName, conf),
         runtimeTask, tezUmbilical, serviceConsumerMetadata,
-        auxServiceEnv, memDist, inputDescriptor);
+        auxServiceEnv, memDist, inputDescriptor, objectRegistry);
     checkNotNull(inputIndex, "inputIndex is null");
     checkNotNull(sourceVertexName, "sourceVertexName is null");
     checkNotNull(input, "input is null");

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 9a4d40b..cde4c0c 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -19,13 +19,16 @@
 package org.apache.tez.runtime.api.impl;
 
 import com.google.common.base.Preconditions;
+
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import javax.annotation.Nullable;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezUserPayload;
@@ -38,6 +41,7 @@ import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 public class TezOutputContextImpl extends TezTaskContextImpl
@@ -57,11 +61,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       @Nullable byte[] userPayload, RuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
-      OutputDescriptor outputDescriptor) {
+      OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry) {
     super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, 
taskAttemptID,
         wrapCounters(counters, taskVertexName, destinationVertexName, conf),
         runtimeTask, tezUmbilical, serviceConsumerMetadata,
-        auxServiceEnv, memDist, outputDescriptor);
+        auxServiceEnv, memDist, outputDescriptor, objectRegistry);
     checkNotNull(outputIndex, "outputIndex is null");
     checkNotNull(destinationVertexName, "destinationVertexName is null");
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index d840dc0..9273cf3 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -19,7 +19,9 @@
 package org.apache.tez.runtime.api.impl;
 
 import com.google.common.base.Preconditions;
+
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -28,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 
 import javax.annotation.Nullable;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +45,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 public class TezProcessorContextImpl extends TezTaskContextImpl implements 
TezProcessorContext {
@@ -58,10 +62,10 @@ public class TezProcessorContextImpl extends 
TezTaskContextImpl implements TezPr
       @Nullable byte[] userPayload, RuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
-      ProcessorDescriptor processorDescriptor, InputReadyTracker 
inputReadyTracker) {
+      ProcessorDescriptor processorDescriptor, InputReadyTracker 
inputReadyTracker, ObjectRegistry objectRegistry) {
     super(conf, workDirs, appAttemptNumber, dagName, vertexName, taskAttemptID,
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
-        auxServiceEnv, memDist, processorDescriptor);
+        auxServiceEnv, memDist, processorDescriptor, objectRegistry);
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
     this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 8cbdaf6..b155cbc 100644
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -19,12 +19,14 @@
 package org.apache.tez.runtime.api.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.Nullable;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -35,6 +37,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.TezTaskContext;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
 import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 import com.google.common.base.Preconditions;
@@ -55,8 +58,9 @@ public abstract class TezTaskContextImpl implements 
TezTaskContext {
   private final int appAttemptNumber;
   private final Map<String, String> auxServiceEnv;
   protected final MemoryDistributor initialMemoryDistributor;
-  protected final TezEntityDescriptor descriptor;
+  protected final TezEntityDescriptor<?> descriptor;
   private final String dagName;
+  private final ObjectRegistry objectRegistry;
 
   @Private
   public TezTaskContextImpl(Configuration conf, String[] workDirs, int 
appAttemptNumber,
@@ -64,7 +68,7 @@ public abstract class TezTaskContextImpl implements 
TezTaskContext {
       TezCounters counters, RuntimeTask runtimeTask,
       TezUmbilical tezUmbilical, Map<String, ByteBuffer> 
serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
-      TezEntityDescriptor descriptor) {
+      TezEntityDescriptor<?> descriptor, ObjectRegistry objectRegistry) {
     checkNotNull(conf, "conf is null");
     checkNotNull(dagName, "dagName is null");
     checkNotNull(taskVertexName, "taskVertexName is null");
@@ -92,6 +96,7 @@ public abstract class TezTaskContextImpl implements 
TezTaskContext {
         generateId());
     this.initialMemoryDistributor = memDist;
     this.descriptor = descriptor;
+    this.objectRegistry = objectRegistry;
   }
 
   @Override
@@ -144,6 +149,11 @@ public abstract class TezTaskContextImpl implements 
TezTaskContext {
   public String getUniqueIdentifier() {
     return uniqueIdentifier;
   }
+  
+  @Override
+  public ObjectRegistry getObjectRegistry() {
+    return objectRegistry;
+  }
 
   @Override
   public ByteBuffer getServiceConsumerMetaData(String serviceName) {

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java
 
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java
deleted file mode 100644
index 4bbbfe2..0000000
--- 
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.common.objectregistry;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.inject.AbstractModule;
-
-public class ObjectRegistryModule extends AbstractModule {
-
-  private final ObjectRegistry objectRegistry;
-
-  public ObjectRegistryModule(ObjectRegistry objectRegistry) {
-    this.objectRegistry = objectRegistry;
-  }
-
-  @VisibleForTesting
-  public ObjectRegistryModule() {
-    objectRegistry = new ObjectRegistryImpl();
-  }
-
-  @Override
-  protected void configure() {
-    bind(ObjectRegistry.class).toInstance(this.objectRegistry);
-    requestStaticInjection(ObjectRegistryFactory.class);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index d2d8f29..a2876a7 100644
--- 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -70,7 +70,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     TaskSpec task2 = createTaskSpec(taId2, "dag2", "vertex1");
 
     LogicalIOProcessorRuntimeTask lio1 = new 
LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null,
-        umbilical, serviceConsumerMetadata, startedInputsMap);
+        umbilical, serviceConsumerMetadata, startedInputsMap, null);
 
     lio1.initialize();
     lio1.run();
@@ -82,7 +82,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     assertEquals(0, TestOutput.startCount);
 
     LogicalIOProcessorRuntimeTask lio2 = new 
LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null,
-        umbilical, serviceConsumerMetadata, startedInputsMap);
+        umbilical, serviceConsumerMetadata, startedInputsMap, null);
 
     lio2.initialize();
     lio2.run();

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
index 592cf97..332993b 100644
--- 
a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
+++ 
b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java
@@ -20,23 +20,11 @@ package org.apache.tez.runtime.common.objectregistry;
 
 import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
 public class TestObjectRegistry {
 
-  @SuppressWarnings("unused")
-  @Before
-  public void setup() {
-    Injector injector = Guice.createInjector(new ObjectRegistryModule());
-  }
-
   private void testCRUD(ObjectRegistry objectRegistry) {
     Assert.assertNotNull(objectRegistry);
 
@@ -58,8 +46,7 @@ public class TestObjectRegistry {
 
   @Test
   public void testBasicCRUD() {
-    ObjectRegistry objectRegistry =
-        ObjectRegistryFactory.getObjectRegistry();
+    ObjectRegistry objectRegistry = new ObjectRegistryImpl();
     testCRUD(objectRegistry);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index 17ba5dc..fbe3b03 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -125,7 +125,7 @@ public class TestOnFileUnorderedKVOutput {
     TezOutputContext outputContext = new TezOutputContextImpl(conf, new 
String[] {workDir.toString()},
         appAttemptNumber, tezUmbilical, dagName, taskVertexName, 
destinationVertexName,
         taskAttemptID, counters, 0, userPayload, runtimeTask,
-        null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor);
+        null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, 
null);
 
     List<Event> events = null;
 

Reply via email to