Repository: tez Updated Branches: refs/heads/branch-0.7 fd7c2fc13 -> e15faa56c
http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index fb8b530..92035e1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -202,11 +202,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { * @throws Exception */ public void initialize() throws Exception { - LOG.info("Initializing LogicalProcessorIORuntimeTask"); Preconditions.checkState(this.state.get() == State.NEW, "Already initialized"); this.state.set(State.INITED); - LOG.info("Creating processor" + ", processorClassName=" + processorDescriptor.getClassName()); this.processorContext = createProcessorContext(); this.processor = createProcessor(processorDescriptor.getClassName(), processorContext); @@ -406,7 +404,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { @Override protected Void callInternal() throws Exception { - LOG.info("Initializing Input using InputSpec: " + inputSpec); + if (LOG.isDebugEnabled()) { + LOG.debug("Initializing Input using InputSpec: " + inputSpec); + } String edgeName = inputSpec.getSourceVertexName(); InputContext inputContext = createInputContext(inputsMap, inputSpec, inputIndex); LogicalInput input = createInput(inputSpec, inputContext); @@ -414,13 +414,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { inputsMap.put(edgeName, input); inputContextMap.put(edgeName, inputContext); - LOG.info("Initializing Input with src edge: " + edgeName); List<Event> events = ((InputFrameworkInterface)input).initialize(); sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT, inputContext.getTaskVertexName(), inputContext.getSourceVertexName(), taskSpec.getTaskAttemptID()); initializedInputs.put(edgeName, input); - LOG.info("Initialized Input with src edge: " + edgeName); + if (LOG.isDebugEnabled()) { + LOG.debug("Initialized Input with src edge: " + edgeName); + } return null; } } @@ -436,7 +437,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { @Override protected Void callInternal() throws Exception { - LOG.info("Starting Input with src edge: " + srcVertexName); + if (LOG.isDebugEnabled()) { + LOG.debug("Starting Input with src edge: " + srcVertexName); + } + input.start(); LOG.info("Started Input with src edge: " + srcVertexName); return null; @@ -455,7 +459,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { @Override protected Void callInternal() throws Exception { - LOG.info("Initializing Output using OutputSpec: " + outputSpec); + if (LOG.isDebugEnabled()) { + LOG.debug("Initializing Output using OutputSpec: " + outputSpec); + } String edgeName = outputSpec.getDestinationVertexName(); OutputContext outputContext = createOutputContext(outputSpec, outputIndex); LogicalOutput output = createOutput(outputSpec, outputContext); @@ -463,13 +469,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { outputsMap.put(edgeName, output); outputContextMap.put(edgeName, outputContext); - LOG.info("Initializing Output with dest edge: " + edgeName); List<Event> events = ((OutputFrameworkInterface)output).initialize(); sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT, outputContext.getTaskVertexName(), outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID()); initializedOutputs.put(edgeName, output); - LOG.info("Initialized Output with dest edge: " + edgeName); + if (LOG.isDebugEnabled()) { + LOG.debug("Initialized Output with dest edge: " + edgeName); + } return null; } } @@ -486,7 +493,9 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { if (groupInputSpecs != null && !groupInputSpecs.isEmpty()) { groupInputsMap = new ConcurrentHashMap<String, MergedLogicalInput>(groupInputSpecs.size()); for (GroupInputSpec groupInputSpec : groupInputSpecs) { - LOG.info("Initializing GroupInput using GroupInputSpec: " + groupInputSpec); + if (LOG.isDebugEnabled()) { + LOG.debug("Initializing GroupInput using GroupInputSpec: " + groupInputSpec); + } MergedInputContext mergedInputContext = new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(), groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs); @@ -505,11 +514,12 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } private void initializeLogicalIOProcessor() throws Exception { - LOG.info("Initializing processor" + ", processorClassName=" - + processorDescriptor.getClassName()); + if (LOG.isDebugEnabled()) { + LOG.debug("Initializing processor" + ", processorClassName=" + + processorDescriptor.getClassName()); + } processor.initialize(); - LOG.info("Initialized processor" + ", processorClassName=" - + processorDescriptor.getClassName()); + LOG.info("Initialized processor"); } private InputContext createInputContext(Map<String, LogicalInput> inputMap, @@ -556,7 +566,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } private LogicalInput createInput(InputSpec inputSpec, InputContext inputContext) throws TezException { - LOG.info("Creating Input"); InputDescriptor inputDesc = inputSpec.getInputDescriptor(); Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName(), new Class[]{InputContext.class, Integer.TYPE}, @@ -579,7 +588,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { } private LogicalOutput createOutput(OutputSpec outputSpec, OutputContext outputContext) throws TezException { - LOG.info("Creating Output"); OutputDescriptor outputDesc = outputSpec.getOutputDescriptor(); Output output = ReflectionUtils.createClazzInstance(outputDesc.getClassName(), new Class[]{OutputContext.class, Integer.TYPE}, @@ -704,7 +712,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { if (e == null) { continue; } - // TODO TODONEWTEZ if (!handleEvent(e)) { LOG.warn("Stopping Event Router thread as failed to handle" + " event: " + e); @@ -798,7 +805,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { "Ignoring exception when closing input {}(cleanup). Exception class={}, message={}", srcVertexName, e.getClass().getName(), e.getMessage()); } finally { - LOG.info("Close input for vertex={}, sourceVertex={}", processor + LOG.info("Closed input for vertex={}, sourceVertex={}", processor .getContext().getTaskVertexName(), srcVertexName); } } @@ -816,7 +823,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { "Ignoring exception when closing output {}(cleanup). Exception class={}, message={}", destVertexName, e.getClass().getName(), e.getMessage()); } finally { - LOG.info("Close input for vertex={}, sourceVertex={}", processor + LOG.info("Closed input for vertex={}, sourceVertex={}", processor .getContext().getTaskVertexName(), destVertexName); } } http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index 8d6466a..4431150 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -176,6 +176,8 @@ public class TezInputContextImpl extends TezTaskContextImpl super.close(); this.userPayload = null; this.inputReadyTracker = null; - LOG.info("Cleared TezInputContextImpl related information"); + if (LOG.isDebugEnabled()) { + LOG.debug("Cleared TezInputContextImpl related information"); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java index 71e96db..1e5b6a5 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java @@ -156,6 +156,8 @@ public class TezOutputContextImpl extends TezTaskContextImpl public void close() throws IOException { super.close(); this.userPayload = null; - LOG.info("Cleared TezOutputContextImpl related information"); + if (LOG.isDebugEnabled()) { + LOG.debug("Cleared TezOutputContextImpl related information"); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java index a191ae8..6dc30ff 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java @@ -121,7 +121,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce super.close(); this.userPayload = null; this.inputReadyTracker = null; - LOG.info("Cleared TezProcessorContextImpl related information"); + if (LOG.isDebugEnabled()) { + LOG.debug("Cleared TezProcessorContextImpl related information"); + } } } http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java index 2622b1f..c822357 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java @@ -61,6 +61,7 @@ public class MemoryDistributor { private long totalJvmMemory; private final boolean isEnabled; + private final String allocatorClassName; private final Set<TaskContext> dupSet = Collections .newSetFromMap(new ConcurrentHashMap<TaskContext, Boolean>()); private final List<RequestorInfo> requestList; @@ -77,7 +78,13 @@ public class MemoryDistributor { this.conf = conf; isEnabled = conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED, TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED_DEFAULT); - + + if (isEnabled) { + allocatorClassName = conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS, + TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS_DEFAULT); + } else { + allocatorClassName = null; + } this.numTotalInputs = numTotalInputs; this.numTotalOutputs = numTotalOutputs; @@ -85,7 +92,8 @@ public class MemoryDistributor { this.requestList = Collections.synchronizedList(new LinkedList<RequestorInfo>()); LOG.info("InitialMemoryDistributor (isEnabled=" + isEnabled + ") invoked with: numInputs=" + numTotalInputs + ", numOutputs=" + numTotalOutputs - + ", JVM.maxFree=" + totalJvmMemory); + + ", JVM.maxFree=" + totalJvmMemory + + ", allocatorClassName=" + allocatorClassName); } @@ -97,7 +105,7 @@ public class MemoryDistributor { TaskContext taskContext, EntityDescriptor<?> descriptor) { registerRequest(requestSize, callback, taskContext, descriptor); } - + /** * Used by the Tez framework to distribute initial memory after components * have made their initial requests. @@ -106,6 +114,9 @@ public class MemoryDistributor { public void makeInitialAllocations() throws TezException { Preconditions.checkState(numInputsSeen.get() == numTotalInputs, "All inputs are expected to ask for memory"); Preconditions.checkState(numOutputsSeen.get() == numTotalOutputs, "All outputs are expected to ask for memory"); + + logInitialRequests(requestList); + Iterable<InitialMemoryRequestContext> requestContexts = Iterables.transform(requestList, new Function<RequestorInfo, InitialMemoryRequestContext>() { public InitialMemoryRequestContext apply(RequestorInfo requestInfo) { @@ -121,14 +132,12 @@ public class MemoryDistributor { } }); } else { - String allocatorClassName = conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS, - TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS_DEFAULT); - LOG.info("Using Allocator class: " + allocatorClassName); InitialMemoryAllocator allocator = ReflectionUtils.createClazzInstance(allocatorClassName); allocator.setConf(conf); allocations = allocator.assignMemory(totalJvmMemory, numTotalInputs, numTotalOutputs, Iterables.unmodifiableIterable(requestContexts)); validateAllocations(allocations, requestList.size()); + logFinalAllocations(allocations, requestList); } // Making the callbacks directly for now, instead of spawning threads. The @@ -137,14 +146,18 @@ public class MemoryDistributor { Iterator<Long> allocatedIter = allocations.iterator(); for (RequestorInfo rInfo : requestList) { long allocated = allocatedIter.next(); - LOG.info("Informing: " + rInfo.getRequestContext().getComponentType() + ", " - + rInfo.getRequestContext().getComponentVertexName() + ", " - + rInfo.getRequestContext().getComponentClassName() + ": requested=" - + rInfo.getRequestContext().getRequestedSize() + ", allocated=" + allocated); + if (LOG.isDebugEnabled()) { + LOG.info("Informing: " + rInfo.getRequestContext().getComponentType() + ", " + + rInfo.getRequestContext().getComponentVertexName() + ", " + + rInfo.getRequestContext().getComponentClassName() + ": requested=" + + rInfo.getRequestContext().getRequestedSize() + ", allocated=" + allocated); + } rInfo.getCallback().memoryAssigned(allocated); } } + + /** * Allow tests to set memory. * @param size @@ -233,8 +246,6 @@ public class MemoryDistributor { this.requestContext = new InitialMemoryRequestContext(requestSize, descriptor.getClassName(), type, componentVertexName); this.callback = callback; - LOG.info("Received request: " + requestSize + ", type: " + type + ", componentVertexName: " - + componentVertexName); } public MemoryUpdateCallback getCallback() { @@ -246,4 +257,45 @@ public class MemoryDistributor { } } + + private void logInitialRequests(List<RequestorInfo> initialRequests) { + if (initialRequests != null && !initialRequests.isEmpty()) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < initialRequests.size(); i++) { + InitialMemoryRequestContext context = initialRequests.get(i).getRequestContext(); + sb.append("["); + sb.append(context.getComponentVertexName()).append(":"); + sb.append(context.getComponentType()).append(":"); + sb.append(context.getRequestedSize()).append(":").append(context.getComponentClassName()); + sb.append("]"); + if (i < initialRequests.size() - 1) { + sb.append(", "); + } + } + LOG.info("InitialRequests=" + sb.toString()); + } + } + + private void logFinalAllocations(Iterable<Long> allocations, List<RequestorInfo> requestList) { + if (requestList != null && !requestList.isEmpty()) { + Iterator<Long> allocatedIter = allocations.iterator(); + StringBuilder sb = new StringBuilder(); + + for (int i = 0 ; i < requestList.size() ; i++) { + long allocated = allocatedIter.next(); + InitialMemoryRequestContext context = requestList.get(i).getRequestContext(); + sb.append("["); + sb.append(context.getComponentVertexName()).append(":"); + sb.append(context.getComponentClassName()).append(":"); + sb.append(context.getComponentType()).append(":"); + sb.append(context.getRequestedSize()).append(":").append(allocated); + sb.append("]"); + if (i < requestList.size() - 1) { + sb.append(", "); + } + } + LOG.info("Allocations=" + sb.toString()); + } + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java index ebb94c6..2472c51 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java @@ -44,7 +44,7 @@ import org.apache.tez.dag.api.TezConfiguration; public class TaskCounterUpdater { private static final Logger LOG = LoggerFactory.getLogger(TaskCounterUpdater.class); - + private final TezCounters tezCounters; private final Configuration conf; @@ -149,6 +149,6 @@ public class TaskCounterUpdater { pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pid, clazz, conf); - LOG.info(" Using ResourceCalculatorProcessTree : " + pTree); + LOG.info("Using ResourceCalculatorProcessTree : " + clazz.getName()); } } http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java index 8ee30c5..0ece227 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java @@ -40,7 +40,7 @@ public class ContainerReporter extends CallableWithNdc<ContainerTask> { private final TezTaskUmbilicalProtocol umbilical; private final ContainerContext containerContext; private final int getTaskMaxSleepTime; - private final long LOG_INTERVAL = 2000l; + private final long LOG_INTERVAL = 30000l; private long nextGetTaskPrintTime; http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index 062b497..7f03992 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -287,10 +287,10 @@ public class TezChild { Preconditions.checkState(!containerTask.shouldDie()); Preconditions.checkState(containerTask.getTaskSpec() != null); if (containerTask.haveCredentialsChanged()) { - LOG.info("Refreshing UGI since Credentials have changed"); Credentials taskCreds = containerTask.getCredentials(); if (taskCreds != null) { - LOG.info("Credentials : #Tokens=" + taskCreds.numberOfTokens() + ", #SecretKeys=" + LOG.info("Refreshing UGI since Credentials have changed. Credentials : #Tokens=" + + taskCreds.numberOfTokens() + ", #SecretKeys=" + taskCreds.numberOfSecretKeys()); childUGI = UserGroupInformation.createRemoteUser(user); childUGI.addCredentials(containerTask.getCredentials()); @@ -315,20 +315,20 @@ public class TezChild { LOG.debug("Additional Resources added to container: " + additionalResources); } - LOG.info("Localizing additional local resources for Task : " + additionalResources); - List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources( - Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() { - @Override - public URI apply(TezLocalResource input) { - return input.getUri(); - } - }), defaultConf, workingDir); - RelocalizationUtils.addUrlsToClassPath(downloadedUrls); - LOG.info("Done localizing additional resources"); - final TaskSpec taskSpec = containerTask.getTaskSpec(); - if (LOG.isDebugEnabled()) { - LOG.debug("New container task context:" + taskSpec.toString()); + if (additionalResources != null && !additionalResources.isEmpty()) { + LOG.info("Localizing additional local resources for Task : " + additionalResources); + + List<URL> downloadedUrls = RelocalizationUtils.processAdditionalResources( + Maps.transformValues(additionalResources, new Function<TezLocalResource, URI>() { + @Override + public URI apply(TezLocalResource input) { + return input.getUri(); + } + }), defaultConf, workingDir); + RelocalizationUtils.addUrlsToClassPath(downloadedUrls); + + LOG.info("Done localizing additional resources"); } } @@ -456,7 +456,8 @@ public class TezChild { final Configuration defaultConf = new Configuration(); Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); - LOG.info("TezChild starting"); + final String pid = System.getenv().get("JVM_PID"); + assert args.length == 5; String host = args[0]; @@ -466,8 +467,7 @@ public class TezChild { final int attemptNumber = Integer.parseInt(args[4]); final String[] localDirs = TezCommonUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS .name())); - final String pid = System.getenv().get("JVM_PID"); - LOG.info("PID, containerIdentifier: " + pid + ", " + containerIdentifier); + LOG.info("TezChild starting with PID=" + pid + ", containerIdentifier=" + containerIdentifier); if (LOG.isDebugEnabled()) { LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port + " containerIdentifier: " + containerIdentifier + " appAttemptNumber: " + attemptNumber http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index ad0eaf9..69436ba 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -62,7 +62,6 @@ public class TezRuntimeUtils { Class<? extends Combiner> clazz; String className = conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS); if (className == null) { - LOG.info("No combiner specified via " + TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS + ". Combiner will not be used"); return null; } LOG.info("Using Combiner class: " + className); http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java index b70c9d7..8477300 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java @@ -168,7 +168,6 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator Request request = new Request(context.getComponentClassName(), context.getRequestedSize(), requestType, typeScaleFactor); requests.add(request); - LOG.info("ScaleFactor: " + typeScaleFactor + ", for type: " + requestType); numRequestsScaled += typeScaleFactor; } @@ -194,7 +193,9 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator requestType = RequestType.PARTITIONED_UNSORTED_OUTPUT; } else { requestType = RequestType.OTHER; - LOG.info("Falling back to RequestType.OTHER for class: " + className); + if (LOG.isDebugEnabled()) { + LOG.debug("Falling back to RequestType.OTHER for class: " + className); + } } return requestType; } @@ -219,6 +220,7 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator } } + StringBuilder sb = new StringBuilder(); Set<RequestType> seenTypes = new HashSet<RequestType>(); for (String ratio : ratios) { @@ -232,7 +234,9 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator } Preconditions.checkState(ratioVal >= 0, "Ratio must be >= 0"); typeScaleMap.put(requestType, ratioVal); + sb.append("[").append(requestType).append(":").append(ratioVal).append("]"); } + LOG.info("ScaleRatiosUsed=" + sb.toString()); } private double computeReservedFraction(int numTotalRequests) { http://git-wip-us.apache.org/repos/asf/tez/blob/e15faa56/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java index 6b8bd0d..bf3e9db 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java @@ -151,9 +151,7 @@ public class RPCLoadGen extends TezExampleBase { random.nextBytes(diskPayload); fs = FileSystem.get(conf); resourcePath = new Path(Path.SEPARATOR + "tmp", DISK_PAYLOAD_NAME); - System.err.println("ZZZ: HDFSPath: " + resourcePath); resourcePath = fs.makeQualified(resourcePath); - System.err.println("ZZZ: HDFSPathResolved: " + resourcePath); FSDataOutputStream dataOut = fs.create(resourcePath, true); dataOut.write(diskPayload); dataOut.close();
