Repository: tez Updated Branches: refs/heads/master 842abc17e -> 170af76fe
TEZ-1311. get sharedobjectregistry from the context instead of a static (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/170af76f Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/170af76f Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/170af76f Branch: refs/heads/master Commit: 170af76fee3cefe524ae651c00389ade647dd698 Parents: 842abc1 Author: Bikas Saha <[email protected]> Authored: Fri Jul 25 11:21:00 2014 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Jul 25 11:21:00 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + pom.xml | 5 --- tez-api/pom.xml | 4 -- .../apache/tez/runtime/api/TezTaskContext.java | 9 ++++ .../objectregistry/ObjectRegistryFactory.java | 32 --------------- tez-dag/pom.xml | 4 -- .../org/apache/tez/runtime/task/TezChild.java | 8 +--- .../apache/tez/runtime/task/TezTaskRunner.java | 8 ++-- .../tez/runtime/task/TestTaskExecution.java | 2 +- .../examples/BroadcastAndOneToOneExample.java | 5 +-- .../tez/mapreduce/examples/MRRSleepJob.java | 15 ------- .../tez/mapreduce/processor/MapUtils.java | 2 +- .../processor/reduce/TestReduceProcessor.java | 2 +- tez-runtime-internals/pom.xml | 4 -- .../runtime/LogicalIOProcessorRuntimeTask.java | 12 ++++-- .../runtime/api/impl/TezInputContextImpl.java | 8 +++- .../runtime/api/impl/TezOutputContextImpl.java | 8 +++- .../api/impl/TezProcessorContextImpl.java | 8 +++- .../runtime/api/impl/TezTaskContextImpl.java | 14 ++++++- .../objectregistry/ObjectRegistryModule.java | 43 -------------------- .../TestLogicalIOProcessorRuntimeTask.java | 4 +- .../objectregistry/TestObjectRegistry.java | 15 +------ .../output/TestOnFileUnorderedKVOutput.java | 2 +- 23 files changed, 64 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5ebb28d..699b291 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -32,6 +32,8 @@ INCOMPATIBLE CHANGES the input and output (bikas) TEZ-1312. rename vertex.addInput/Output() to vertex.addDataSource/Sink() (Chen He via bikas) + TEZ-1311. get sharedobjectregistry from the context instead of a static + (bikas)" Release 0.4.0-incubating: 2014-04-05 http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 83d5db6..0d44b09 100644 --- a/pom.xml +++ b/pom.xml @@ -575,11 +575,6 @@ <version>${protobuf.version}</version> </dependency> <dependency> - <groupId>com.google.inject</groupId> - <artifactId>guice</artifactId> - <version>3.0</version> - </dependency> - <dependency> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> <version>1.0.4.1</version> http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-api/pom.xml ---------------------------------------------------------------------- diff --git a/tez-api/pom.xml b/tez-api/pom.xml index 0048762..e3c4e8d 100644 --- a/tez-api/pom.xml +++ b/tez-api/pom.xml @@ -46,10 +46,6 @@ <artifactId>commons-lang</artifactId> </dependency> <dependency> - <groupId>com.google.inject</groupId> - <artifactId>guice</artifactId> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java index 04ba05f..7b93c65 100644 --- a/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java +++ b/tez-api/src/main/java/org/apache/tez/runtime/api/TezTaskContext.java @@ -22,8 +22,10 @@ import java.nio.ByteBuffer; import java.util.List; import javax.annotation.Nullable; + import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; /** * Base interface for Context classes used to initialize the Input, Output @@ -107,6 +109,13 @@ public interface TezTaskContext { * @return a unique identifier */ public String getUniqueIdentifier(); + + /** + * Returns a shared {@link ObjectRegistry} to hold user objects in memory + * between tasks. + * @return {@link ObjectRegistry} + */ + public ObjectRegistry getObjectRegistry(); /** * Report a fatal error to the framework. This will cause the entire task to http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-api/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryFactory.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryFactory.java b/tez-api/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryFactory.java deleted file mode 100644 index 3e409cd..0000000 --- a/tez-api/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.runtime.common.objectregistry; - -import com.google.inject.Inject; - -public class ObjectRegistryFactory { - - @Inject - private static ObjectRegistry objectRegistry; - - public static ObjectRegistry getObjectRegistry() { - return objectRegistry; - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-dag/pom.xml ---------------------------------------------------------------------- diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml index d7106c3..6a0da79 100644 --- a/tez-dag/pom.xml +++ b/tez-dag/pom.xml @@ -118,10 +118,6 @@ <artifactId>junit</artifactId> </dependency> <dependency> - <groupId>com.google.inject</groupId> - <artifactId>guice</artifactId> - </dependency> - <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java index 8c23722..131def0 100644 --- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -62,7 +62,6 @@ import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule; import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils; import com.google.common.base.Function; @@ -74,8 +73,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.Guice; -import com.google.inject.Injector; public class TezChild { @@ -207,7 +204,7 @@ public class TezChild { // Execute the Actual Task TezTaskRunner taskRunner = new TezTaskRunner(new TezConfiguration(defaultConf), childUGI, localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber, - serviceConsumerMetadata, startedInputsMap, taskReporter, executor); + serviceConsumerMetadata, startedInputsMap, taskReporter, executor, objectRegistry); boolean shouldDie = false; try { shouldDie = !taskRunner.run(); @@ -355,9 +352,8 @@ public class TezChild { // of this class. Leaving it here, till there's some entity representing a running JVM. DefaultMetricsSystem.initialize("TezTask"); + // singleton of ObjectRegistry for this JVM ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl(); - @SuppressWarnings("unused") - Injector injector = Guice.createInjector(new ObjectRegistryModule(objectRegistry)); TezChild tezChild = new TezChild(defaultConf, host, port, containerIdentifier, tokenIdentifier, attemptNumber, localDirs, objectRegistry); http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java index fb770e6..f1ddc0b 100644 --- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java +++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java @@ -34,7 +34,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Logger; -import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -43,6 +42,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezUmbilical; +import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; @@ -70,16 +70,16 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter { TezTaskRunner(Configuration tezConf, UserGroupInformation ugi, String[] localDirs, TaskSpec taskSpec, TezTaskUmbilicalProtocol umbilical, int appAttemptNumber, Map<String, ByteBuffer> serviceConsumerMetadata, Multimap<String, String> startedInputsMap, - TaskReporter taskReporter, ListeningExecutorService executor) throws IOException { + TaskReporter taskReporter, ListeningExecutorService executor, ObjectRegistry objectRegistry) + throws IOException { this.tezConf = tezConf; this.ugi = ugi; this.taskReporter = taskReporter; this.executor = executor; task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this, - serviceConsumerMetadata, startedInputsMap); + serviceConsumerMetadata, startedInputsMap, objectRegistry); taskReporter.registerTask(task, this); taskRunning = new AtomicBoolean(true); - } /** http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java index 0d1e509..98abf1c 100644 --- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java +++ b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java @@ -649,7 +649,7 @@ public class TestTaskExecution { TezTaskRunner taskRunner = new TezTaskRunner(tezConf, ugi, localDirs, taskSpec, umbilical, 1, new HashMap<String, ByteBuffer>(), HashMultimap.<String, String> create(), taskReporter, - executor); + executor, null); return taskRunner; } http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java index e476a88..7e7d351 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java @@ -48,7 +48,6 @@ import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; import org.apache.tez.mapreduce.hadoop.MRHelpers; import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle; import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory; import org.apache.tez.runtime.library.api.KeyValueReader; import org.apache.tez.runtime.library.api.KeyValueWriter; import org.apache.tez.runtime.library.conf.UnorderedUnpartitionedKVEdgeConfigurer; @@ -72,7 +71,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool { if (userPayload != null) { boolean doLocalityCheck = userPayload[0] > 0 ? true : false; if (doLocalityCheck) { - ObjectRegistry objectRegistry = ObjectRegistryFactory.getObjectRegistry(); + ObjectRegistry objectRegistry = getContext().getObjectRegistry(); String entry = String.valueOf(getContext().getTaskIndex()); objectRegistry.add(ObjectLifeCycle.DAG, entry, entry); } @@ -104,7 +103,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool { Preconditions.checkState((sum == expectedSum), "Sum = " + sum); if (doLocalityCheck) { - ObjectRegistry objectRegistry = ObjectRegistryFactory.getObjectRegistry(); + ObjectRegistry objectRegistry = getContext().getObjectRegistry(); String index = (String) objectRegistry.get(String.valueOf(getContext().getTaskIndex())); if (index == null || Integer.valueOf(index).intValue() != getContext().getTaskIndex()) { String msg = "Did not find expected local producer " http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index 14fe441..ec49a1c 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -79,9 +79,6 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; import org.apache.tez.mapreduce.partition.MRPartitioner; import org.apache.tez.mapreduce.processor.map.MapProcessor; import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; -import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; import com.google.common.annotations.VisibleForTesting; @@ -222,18 +219,6 @@ public class MRRSleepJob extends Configured implements Tool { TaskAttemptID taId = context.getTaskAttemptID(); - ObjectRegistry objectRegistry = ObjectRegistryFactory.getObjectRegistry(); - String fooBarVal = (String) objectRegistry.get("FooBar"); - if (null == fooBarVal) { - LOG.info("Adding FooBar key to Object cache"); - objectRegistry.add(ObjectLifeCycle.DAG, - "FooBar", "BarFooFromTask" + taId.getTaskID().toString()); - } else { - LOG.info("Got FooBar val from Object cache" - + ", currentTaskId=" + taId.getTaskID().toString() - + ", val=" + fooBarVal); - } - String[] taskIds = conf.getStrings(MAP_ERROR_TASK_IDS); if (taId.getId()+1 >= context.getMaxMapAttempts()) { finalAttempt = true; http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java index 69d6cb7..3731c64 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java @@ -222,7 +222,7 @@ public class MapUtils { new String[] {workDir.toString()}, umbilical, serviceConsumerMetadata, - HashMultimap.<String, String>create()); + HashMultimap.<String, String>create(), null); return task; } } http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java index c1798d3..5ea6eb5 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java @@ -184,7 +184,7 @@ public class TestReduceProcessor { new String[] {workDir.toString()}, new TestUmbilical(), serviceConsumerMetadata, - HashMultimap.<String, String>create()); + HashMultimap.<String, String>create(), null); task.initialize(); task.run(); http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/pom.xml ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/pom.xml b/tez-runtime-internals/pom.xml index f106997..779bb02 100644 --- a/tez-runtime-internals/pom.xml +++ b/tez-runtime-internals/pom.xml @@ -50,10 +50,6 @@ <artifactId>hadoop-yarn-common</artifactId> </dependency> <dependency> - <groupId>com.google.inject</groupId> - <artifactId>guice</artifactId> - </dependency> - <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index 52bc0a6..a72dafa 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -74,6 +74,7 @@ import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl; import org.apache.tez.runtime.api.impl.TezOutputContextImpl; import org.apache.tez.runtime.api.impl.TezProcessorContextImpl; import org.apache.tez.runtime.api.impl.TezUmbilical; +import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; import org.apache.tez.runtime.common.resources.MemoryDistributor; import com.google.common.annotations.VisibleForTesting; @@ -125,13 +126,15 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { private final int appAttemptNumber; private final InputReadyTracker inputReadyTracker; + + private final ObjectRegistry objectRegistry; // KKK Make sure LogicalInputFramework checks are in place public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, String[] localDirs, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, - Multimap<String, String> startedInputsMap) throws IOException { + Multimap<String, String> startedInputsMap, ObjectRegistry objectRegistry) throws IOException { // TODO Remove jobToken from here post TEZ-421 super(taskSpec, tezConf, tezUmbilical); LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: " @@ -167,6 +170,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { initialMemoryDistributor = new MemoryDistributor(numInputs, numOutputs, tezConf); this.startedInputsMap = startedInputsMap; this.inputReadyTracker = new InputReadyTracker(); + this.objectRegistry = objectRegistry; } /** @@ -479,7 +483,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { tezCounters, inputIndex, inputSpec.getInputDescriptor().getUserPayload(), this, serviceConsumerMetadata, System.getenv(), initialMemoryDistributor, - inputSpec.getInputDescriptor(), input, inputReadyTracker); + inputSpec.getInputDescriptor(), input, inputReadyTracker, objectRegistry); return inputContext; } @@ -491,7 +495,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { tezCounters, outputIndex, outputSpec.getOutputDescriptor().getUserPayload(), this, serviceConsumerMetadata, System.getenv(), initialMemoryDistributor, - outputSpec.getOutputDescriptor()); + outputSpec.getOutputDescriptor(), objectRegistry); return outputContext; } @@ -502,7 +506,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { taskSpec.getTaskAttemptID(), tezCounters, processorDescriptor.getUserPayload(), this, serviceConsumerMetadata, System.getenv(), initialMemoryDistributor, - processorDescriptor, inputReadyTracker); + processorDescriptor, inputReadyTracker, objectRegistry); return processorContext; } http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index fed36ab..e98d694 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -19,13 +19,16 @@ package org.apache.tez.runtime.api.impl; import com.google.common.base.Preconditions; + import static com.google.common.base.Preconditions.checkNotNull; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.annotation.Nullable; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezUserPayload; @@ -40,6 +43,7 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; import org.apache.tez.runtime.common.resources.MemoryDistributor; public class TezInputContextImpl extends TezTaskContextImpl @@ -59,11 +63,11 @@ public class TezInputContextImpl extends TezTaskContextImpl TezCounters counters, int inputIndex, @Nullable byte[] userPayload, RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, - InputDescriptor inputDescriptor, Input input, InputReadyTracker inputReadyTracker) { + InputDescriptor inputDescriptor, Input input, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry) { super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, taskAttemptID, wrapCounters(counters, taskVertexName, sourceVertexName, conf), runtimeTask, tezUmbilical, serviceConsumerMetadata, - auxServiceEnv, memDist, inputDescriptor); + auxServiceEnv, memDist, inputDescriptor, objectRegistry); checkNotNull(inputIndex, "inputIndex is null"); checkNotNull(sourceVertexName, "sourceVertexName is null"); checkNotNull(input, "input is null"); http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index 9a4d40b..cde4c0c 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -19,13 +19,16 @@ package org.apache.tez.runtime.api.impl; import com.google.common.base.Preconditions; + import static com.google.common.base.Preconditions.checkNotNull; + import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.annotation.Nullable; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.tez.common.TezUserPayload; @@ -38,6 +41,7 @@ import org.apache.tez.runtime.RuntimeTask; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.TezOutputContext; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; import org.apache.tez.runtime.common.resources.MemoryDistributor; public class TezOutputContextImpl extends TezTaskContextImpl @@ -57,11 +61,11 @@ public class TezOutputContextImpl extends TezTaskContextImpl @Nullable byte[] userPayload, RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, - OutputDescriptor outputDescriptor) { + OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry) { super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, taskAttemptID, wrapCounters(counters, taskVertexName, destinationVertexName, conf), runtimeTask, tezUmbilical, serviceConsumerMetadata, - auxServiceEnv, memDist, outputDescriptor); + auxServiceEnv, memDist, outputDescriptor, objectRegistry); checkNotNull(outputIndex, "outputIndex is null"); checkNotNull(destinationVertexName, "destinationVertexName is null"); this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload); http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index d840dc0..9273cf3 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -19,7 +19,9 @@ package org.apache.tez.runtime.api.impl; import com.google.common.base.Preconditions; + import static com.google.common.base.Preconditions.checkNotNull; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -28,6 +30,7 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -42,6 +45,7 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; import org.apache.tez.runtime.common.resources.MemoryDistributor; public class TezProcessorContextImpl extends TezTaskContextImpl implements TezProcessorContext { @@ -58,10 +62,10 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements TezPr @Nullable byte[] userPayload, RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, - ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker) { + ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry) { super(conf, workDirs, appAttemptNumber, dagName, vertexName, taskAttemptID, counters, runtimeTask, tezUmbilical, serviceConsumerMetadata, - auxServiceEnv, memDist, processorDescriptor); + auxServiceEnv, memDist, processorDescriptor, objectRegistry); checkNotNull(inputReadyTracker, "inputReadyTracker is null"); this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload); this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR, http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java index 8cbdaf6..b155cbc 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java @@ -19,12 +19,14 @@ package org.apache.tez.runtime.api.impl; import static com.google.common.base.Preconditions.checkNotNull; + import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -35,6 +37,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.RuntimeTask; import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.TezTaskContext; +import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; import org.apache.tez.runtime.common.resources.MemoryDistributor; import com.google.common.base.Preconditions; @@ -55,8 +58,9 @@ public abstract class TezTaskContextImpl implements TezTaskContext { private final int appAttemptNumber; private final Map<String, String> auxServiceEnv; protected final MemoryDistributor initialMemoryDistributor; - protected final TezEntityDescriptor descriptor; + protected final TezEntityDescriptor<?> descriptor; private final String dagName; + private final ObjectRegistry objectRegistry; @Private public TezTaskContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber, @@ -64,7 +68,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext { TezCounters counters, RuntimeTask runtimeTask, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata, Map<String, String> auxServiceEnv, MemoryDistributor memDist, - TezEntityDescriptor descriptor) { + TezEntityDescriptor<?> descriptor, ObjectRegistry objectRegistry) { checkNotNull(conf, "conf is null"); checkNotNull(dagName, "dagName is null"); checkNotNull(taskVertexName, "taskVertexName is null"); @@ -92,6 +96,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext { generateId()); this.initialMemoryDistributor = memDist; this.descriptor = descriptor; + this.objectRegistry = objectRegistry; } @Override @@ -144,6 +149,11 @@ public abstract class TezTaskContextImpl implements TezTaskContext { public String getUniqueIdentifier() { return uniqueIdentifier; } + + @Override + public ObjectRegistry getObjectRegistry() { + return objectRegistry; + } @Override public ByteBuffer getServiceConsumerMetaData(String serviceName) { http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java deleted file mode 100644 index 4bbbfe2..0000000 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/objectregistry/ObjectRegistryModule.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tez.runtime.common.objectregistry; - -import com.google.common.annotations.VisibleForTesting; -import com.google.inject.AbstractModule; - -public class ObjectRegistryModule extends AbstractModule { - - private final ObjectRegistry objectRegistry; - - public ObjectRegistryModule(ObjectRegistry objectRegistry) { - this.objectRegistry = objectRegistry; - } - - @VisibleForTesting - public ObjectRegistryModule() { - objectRegistry = new ObjectRegistryImpl(); - } - - @Override - protected void configure() { - bind(ObjectRegistry.class).toInstance(this.objectRegistry); - requestStaticInjection(ObjectRegistryFactory.class); - } - -} http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index d2d8f29..a2876a7 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -70,7 +70,7 @@ public class TestLogicalIOProcessorRuntimeTask { TaskSpec task2 = createTaskSpec(taId2, "dag2", "vertex1"); LogicalIOProcessorRuntimeTask lio1 = new LogicalIOProcessorRuntimeTask(task1, 0, tezConf, null, - umbilical, serviceConsumerMetadata, startedInputsMap); + umbilical, serviceConsumerMetadata, startedInputsMap, null); lio1.initialize(); lio1.run(); @@ -82,7 +82,7 @@ public class TestLogicalIOProcessorRuntimeTask { assertEquals(0, TestOutput.startCount); LogicalIOProcessorRuntimeTask lio2 = new LogicalIOProcessorRuntimeTask(task2, 0, tezConf, null, - umbilical, serviceConsumerMetadata, startedInputsMap); + umbilical, serviceConsumerMetadata, startedInputsMap, null); lio2.initialize(); lio2.run(); http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java index 592cf97..332993b 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/common/objectregistry/TestObjectRegistry.java @@ -20,23 +20,11 @@ package org.apache.tez.runtime.common.objectregistry; import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle; import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory; -import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import com.google.inject.Guice; -import com.google.inject.Injector; - public class TestObjectRegistry { - @SuppressWarnings("unused") - @Before - public void setup() { - Injector injector = Guice.createInjector(new ObjectRegistryModule()); - } - private void testCRUD(ObjectRegistry objectRegistry) { Assert.assertNotNull(objectRegistry); @@ -58,8 +46,7 @@ public class TestObjectRegistry { @Test public void testBasicCRUD() { - ObjectRegistry objectRegistry = - ObjectRegistryFactory.getObjectRegistry(); + ObjectRegistry objectRegistry = new ObjectRegistryImpl(); testCRUD(objectRegistry); } http://git-wip-us.apache.org/repos/asf/tez/blob/170af76f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index 17ba5dc..fbe3b03 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -125,7 +125,7 @@ public class TestOnFileUnorderedKVOutput { TezOutputContext outputContext = new TezOutputContextImpl(conf, new String[] {workDir.toString()}, appAttemptNumber, tezUmbilical, dagName, taskVertexName, destinationVertexName, taskAttemptID, counters, 0, userPayload, runtimeTask, - null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor); + null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null); List<Event> events = null;
