Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Thu Nov 27 12:49:54 2014 @@ -23,10 +23,11 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +35,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.PigConfiguration; +import org.apache.pig.PigException; import org.apache.pig.PigWarning; import org.apache.pig.backend.BackendException; import org.apache.pig.backend.executionengine.ExecException; @@ -41,40 +43,64 @@ import org.apache.pig.backend.hadoop.dat import org.apache.pig.backend.hadoop.executionengine.Launcher; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.LoaderProcessor; -import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.NoopFilterRemover; -import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.ParallelismSetter; -import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPOPackageAnnotator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerPrinter; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.AccumulatorOptimizer; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.CombinerOptimizer; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.LoaderProcessor; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.MultiQueryOptimizerTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.NoopFilterRemover; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.SecondaryKeyOptimizerTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.UnionOptimizer; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.CompilationMessageCollector; import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.LogUtils; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; +import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; import org.apache.pig.tools.pigstats.tez.TezScriptState; -import org.apache.pig.tools.pigstats.tez.TezStats; -import org.apache.pig.tools.pigstats.tez.TezTaskStats; -import org.apache.tez.common.TezUtils; +import org.apache.pig.tools.pigstats.tez.TezVertexStats; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Main class that launches pig for Tez */ public class TezLauncher extends Launcher { private static final Log log = LogFactory.getLog(TezLauncher.class); - private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1); + private static ThreadFactory namedThreadFactory; + private ExecutorService executor; private boolean aggregateWarning = false; private TezScriptState tezScriptState; - private TezStats tezStats; + private TezPigScriptStats tezStats; private TezJob runningJob; + public TezLauncher() { + if (namedThreadFactory == null) { + namedThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("PigTezLauncher-%d") + .setUncaughtExceptionHandler(new JobControlThreadExceptionHandler()) + .build(); + } + executor = Executors.newSingleThreadExecutor(namedThreadFactory); + } + @Override public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception { if (pc.getExecType().isLocal()) { @@ -83,7 +109,7 @@ public class TezLauncher extends Launche pc.getProperties().setProperty("tez.ignore.lib.uris", "true"); } Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true); - if (pc.defaultParallel == -1 && !conf.getBoolean(PigConfiguration.TEZ_AUTO_PARALLELISM, true)) { + if (pc.defaultParallel == -1 && !conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) { pc.defaultParallel = 1; } aggregateWarning = conf.getBoolean("aggregate.warning", false); @@ -98,60 +124,52 @@ public class TezLauncher extends Launche List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>(); tezScriptState = TezScriptState.get(); - tezStats = new TezStats(pc); + tezStats = new TezPigScriptStats(pc); PigStats.start(tezStats); conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true"); TezJobCompiler jc = new TezJobCompiler(pc, conf); TezPlanContainer tezPlanContainer = compile(php, pc); - int defaultTimeToSleep = pc.getExecType().isLocal() ? 100 : 1000; - int timeToSleep = conf.getInt("pig.jobcontrol.sleep", defaultTimeToSleep); - if (timeToSleep != defaultTimeToSleep) { - log.info("overriding default JobControl sleep (" + - defaultTimeToSleep + ") to " + timeToSleep); - } + tezStats.initialize(tezPlanContainer); + tezScriptState.emitInitialPlanNotification(tezPlanContainer); + tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch + TezPlanContainerNode tezPlanContainerNode; TezOperPlan tezPlan; - while ((tezPlan=tezPlanContainer.getNextPlan(processedPlans)) != null) { - optimize(tezPlan, pc); + int processedDAGs = 0; + while ((tezPlanContainerNode = tezPlanContainer.getNextPlan(processedPlans)) != null) { + tezPlan = tezPlanContainerNode.getTezOperPlan(); + processLoadAndParallelism(tezPlan, pc); processedPlans.add(tezPlan); - ProgressReporter reporter = new ProgressReporter(); - tezStats.initialize(tezPlan); + ProgressReporter reporter = new ProgressReporter(tezPlanContainer.size(), processedDAGs); if (tezPlan.size()==1 && tezPlan.getRoots().get(0) instanceof NativeTezOper) { // Native Tez Plan NativeTezOper nativeOper = (NativeTezOper)tezPlan.getRoots().get(0); - tezScriptState.emitInitialPlanNotification(tezPlan); - tezScriptState.emitLaunchStartedNotification(tezPlan.size()); tezScriptState.emitJobsSubmittedNotification(1); - nativeOper.runJob(); + nativeOper.runJob(tezPlanContainerNode.getOperatorKey().toString()); } else { TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan); pkgAnnotator.visit(); - runningJob = jc.compile(tezPlan, grpName, tezPlanContainer); - - tezScriptState.emitInitialPlanNotification(tezPlan); - tezScriptState.emitLaunchStartedNotification(tezPlan.size()); - + runningJob = jc.compile(tezPlanContainerNode, tezPlanContainer); + //TODO: Exclude vertex groups from numVerticesToLaunch ?? + tezScriptState.dagLaunchNotification(runningJob.getName(), tezPlan, tezPlan.size()); + runningJob.setPigStats(tezStats); + // Set the thread UDFContext so registered classes are available. final UDFContext udfContext = UDFContext.getUDFContext(); - Thread task = new Thread(runningJob) { + Runnable task = new Runnable() { @Override public void run() { + Thread.currentThread().setContextClassLoader(PigContext.getClassLoader()); UDFContext.setUdfContext(udfContext.clone()); - super.run(); + runningJob.run(); } }; - - JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler(); - task.setUncaughtExceptionHandler(jctExceptionHandler); - task.setContextClassLoader(PigContext.getClassLoader()); - - tezStats.setTezJob(runningJob); - + // Mark the times that the jobs were submitted so it's reflected in job - // history props + // history props. TODO: Fix this. unused now long scriptSubmittedTimestamp = System.currentTimeMillis(); // Job.getConfiguration returns the shared configuration object Configuration jobConf = runningJob.getConfiguration(); @@ -159,20 +177,42 @@ public class TezLauncher extends Launche Long.toString(scriptSubmittedTimestamp)); jobConf.set("pig.job.submitted.timestamp", Long.toString(System.currentTimeMillis())); - - Future<?> future = executor.schedule(task, timeToSleep, TimeUnit.MILLISECONDS); - + + Future<?> future = executor.submit(task); tezScriptState.emitJobsSubmittedNotification(1); - reporter.notifyStarted(); - + + boolean jobStarted = false; + while (!future.isDone()) { + if (!jobStarted && runningJob.getApplicationId() != null) { + jobStarted = true; + String appId = runningJob.getApplicationId().toString(); + //For Oozie Pig action job id matching compatibility with MR mode + log.info("HadoopJobId: "+ appId.replace("application", "job")); + tezScriptState.emitJobStartedNotification(appId); + tezScriptState.dagStartedNotification(runningJob.getName(), appId); + } reporter.notifyUpdate(); Thread.sleep(1000); } - - tezStats.accumulateStats(runningJob); + // For tez_local mode where PigProcessor destroys all UDFContext + UDFContext.setUdfContext(udfContext); + try { + // In case of FutureTask there is no uncaught exception + // Need to do future.get() to get any exception + future.get(); + } catch (ExecutionException e) { + setJobException(e.getCause()); + } } - tezScriptState.emitProgressUpdatedNotification(100); + processedDAGs++; + if (tezPlanContainer.size() == processedDAGs) { + tezScriptState.emitProgressUpdatedNotification(100); + } else { + tezScriptState.emitProgressUpdatedNotification( + ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100); + } + handleUnCaughtException(pc); tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed()); } @@ -203,6 +243,28 @@ public class TezLauncher extends Launche return tezStats; } + private void handleUnCaughtException(PigContext pc) throws Exception { + //check for the uncaught exceptions from TezJob thread + //if the job controller fails before launching the jobs then there are + //no jobs to check for failure + if (jobControlException != null) { + if (jobControlException instanceof PigException) { + if (jobControlExceptionStackTrace != null) { + LogUtils.writeLog("Error message from Tez Job", + jobControlExceptionStackTrace, pc + .getProperties().getProperty( + "pig.logfile"), log); + } + throw jobControlException; + } else { + int errCode = 2117; + String msg = "Unexpected error when launching Tez job."; + throw new ExecException(msg, errCode, PigException.BUG, + jobControlException); + } + } + } + private void computeWarningAggregate(Map<String, Map<String, Long>> counterGroups, Map<Enum, Long> aggMap) { for (Map<String, Long> counters : counterGroups.values()) { for (Enum e : PigWarning.values()) { @@ -223,53 +285,57 @@ public class TezLauncher extends Launche } private class ProgressReporter { + private int totalDAGs; + private int processedDAGS; private int count = 0; private int prevProgress = 0; - public void notifyStarted() throws IOException { - for (Vertex v : runningJob.getDAG().getVertices()) { - TezTaskStats tts = tezStats.getVertexStats(v.getName()); - UserPayload payload = v.getProcessorDescriptor().getUserPayload(); - Configuration conf = TezUtils.createConfFromUserPayload(payload); - tts.setConf(conf); - tts.setId(v.getName()); - tezScriptState.emitJobStartedNotification(v.getName()); - } + public ProgressReporter(int totalDAGs, int processedDAGs) { + this.totalDAGs = totalDAGs; + this.processedDAGS = processedDAGs; } public void notifyUpdate() { DAGStatus dagStatus = runningJob.getDAGStatus(); if (dagStatus != null && dagStatus.getState() == DAGStatus.State.RUNNING) { // Emit notification when the job has progressed more than 1%, - // or every 10 second + // or every 20 seconds int currProgress = Math.round(runningJob.getDAGProgress() * 100f); if (currProgress - prevProgress >= 1 || count % 100 == 0) { - tezScriptState.emitProgressUpdatedNotification(currProgress); + tezScriptState.dagProgressNotification(runningJob.getName(), -1, currProgress); + tezScriptState.emitProgressUpdatedNotification((currProgress + (100 * processedDAGS))/totalDAGs); prevProgress = currProgress; } count++; } + // TODO: Add new vertex tracking methods to PigTezProgressNotificationListener + // and emit notifications for individual vertex start, progress and completion } public boolean notifyFinishedOrFailed() { DAGStatus dagStatus = runningJob.getDAGStatus(); + if (dagStatus == null) { + return false; + } if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) { Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>(); - for (Vertex v : runningJob.getDAG().getVertices()) { - TezTaskStats tts = tezStats.getVertexStats(v.getName()); - tezScriptState.emitjobFinishedNotification(tts); - Map<String, Map<String, Long>> counterGroups = runningJob.getVertexCounters(v.getName()); - computeWarningAggregate(counterGroups, warningAggMap); + DAG dag = runningJob.getDAG(); + for (Vertex v : dag.getVertices()) { + TezVertexStats tts = tezStats.getVertexStats(dag.getName(), v.getName()); + if (tts == null) { + continue; //vertex groups + } + Map<String, Map<String, Long>> counterGroups = tts.getCounters(); + if (counterGroups == null) { + log.warn("Counters are not available for vertex " + v.getName() + ". Not computing warning aggregates."); + } else { + computeWarningAggregate(counterGroups, warningAggMap); + } } if (aggregateWarning) { CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log); } return true; - } else if (dagStatus.getState() == DAGStatus.State.FAILED) { - for (Vertex v : ((TezJob)runningJob).getDAG().getVertices()) { - TezTaskStats tts = tezStats.getVertexStats(v.getName()); - tezScriptState.emitJobFailedNotification(tts); - } } return false; } @@ -281,10 +347,6 @@ public class TezLauncher extends Launche VisitorException, IOException { log.debug("Entering TezLauncher.explain"); TezPlanContainer tezPlanContainer = compile(php, pc); - for (Map.Entry<OperatorKey,TezPlanContainerNode> entry : tezPlanContainer.getKeys().entrySet()) { - TezOperPlan tezPlan = entry.getValue().getNode(); - optimize(tezPlan, pc); - } if (format.equals("text")) { TezPlanContainerPrinter printer = new TezPlanContainerPrinter(ps, tezPlanContainer); @@ -296,7 +358,20 @@ public class TezLauncher extends Launche } } - public static void optimize(TezOperPlan tezPlan, PigContext pc) throws VisitorException { + public TezPlanContainer compile(PhysicalPlan php, PigContext pc) + throws PlanException, IOException, VisitorException { + TezCompiler comp = new TezCompiler(php, pc); + comp.compile(); + TezPlanContainer planContainer = comp.getPlanContainer(); + for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer + .getKeys().entrySet()) { + TezOperPlan tezPlan = entry.getValue().getTezOperPlan(); + optimize(tezPlan, pc); + } + return planContainer; + } + + private void optimize(TezOperPlan tezPlan, PigContext pc) throws VisitorException { Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); boolean aggregateWarning = conf.getBoolean("aggregate.warning", false); @@ -304,10 +379,10 @@ public class TezLauncher extends Launche filter.visit(); // Run CombinerOptimizer on Tez plan - boolean nocombiner = conf.getBoolean(PigConfiguration.PROP_NO_COMBINER, false); + boolean nocombiner = conf.getBoolean(PigConfiguration.PIG_EXEC_NO_COMBINER, false); if (!pc.inIllustrator && !nocombiner) { boolean doMapAgg = Boolean.parseBoolean(pc.getProperties().getProperty( - PigConfiguration.PROP_EXEC_MAP_PARTAGG, "false")); + PigConfiguration.PIG_EXEC_MAP_PARTAGG, "false")); CombinerOptimizer co = new CombinerOptimizer(tezPlan, doMapAgg); co.visit(); co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log); @@ -321,7 +396,7 @@ public class TezLauncher extends Launche skOptimizer.visit(); } - boolean isMultiQuery = conf.getBoolean(PigConfiguration.OPT_MULTIQUERY, true); + boolean isMultiQuery = conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true); if (isMultiQuery) { // reduces the number of TezOpers in the Tez plan generated // by multi-query (multi-store) script. @@ -330,35 +405,32 @@ public class TezLauncher extends Launche } // Run AccumulatorOptimizer on Tez plan - boolean isAccum = conf.getBoolean(PigConfiguration.OPT_ACCUMULATOR, true); + boolean isAccum = conf.getBoolean(PigConfiguration.PIG_OPT_ACCUMULATOR, true); if (isAccum) { AccumulatorOptimizer accum = new AccumulatorOptimizer(tezPlan); accum.visit(); } // Use VertexGroup in Tez - boolean isUnionOpt = conf.getBoolean(PigConfiguration.TEZ_OPT_UNION, true); + boolean isUnionOpt = conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true); if (isUnionOpt) { UnionOptimizer uo = new UnionOptimizer(tezPlan); uo.visit(); } + } + + public static void processLoadAndParallelism(TezOperPlan tezPlan, PigContext pc) throws VisitorException { if (!pc.inExplain && !pc.inDumpSchema) { LoaderProcessor loaderStorer = new LoaderProcessor(tezPlan, pc); loaderStorer.visit(); ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc); parallelismSetter.visit(); + tezPlan.setEstimatedParallelism(parallelismSetter.getEstimatedTotalParallelism()); } } - public TezPlanContainer compile(PhysicalPlan php, PigContext pc) - throws PlanException, IOException, VisitorException { - TezCompiler comp = new TezCompiler(php, pc); - comp.compile(); - return comp.getPlanContainer(); - } - @Override public void kill() throws BackendException { if (runningJob != null) { @@ -368,9 +440,7 @@ public class TezLauncher extends Launche throw new BackendException(e); } } - if (executor != null) { - executor.shutdownNow(); - } + destroy(); } @Override @@ -385,4 +455,17 @@ public class TezLauncher extends Launche log.info("Cannot find job: " + jobID); } } + + @Override + public void destroy() { + try { + if (executor != null && !executor.isShutdown()) { + log.info("Shutting down thread pool"); + executor.shutdownNow(); + } + } catch (Exception e) { + log.warn("Error shutting down threadpool"); + } + } + }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Thu Nov 27 12:49:54 2014 @@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.pig.backend.hadoop.datastorage.HPath; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; @@ -43,6 +42,7 @@ public class TezResourceManager { private Path stagingDir; private FileSystem remoteFs; private Configuration conf; + private PigContext pigContext; public Map<String, Path> resources = new HashMap<String, Path>(); static public TezResourceManager getInstance() { @@ -54,9 +54,10 @@ public class TezResourceManager { public void init(PigContext pigContext, Configuration conf) throws IOException { if (!inited) { - this.stagingDir = ((HPath)FileLocalizer.getTemporaryResourcePath(pigContext)).getPath(); + this.stagingDir = FileLocalizer.getTemporaryResourcePath(pigContext); this.remoteFs = FileSystem.get(conf); this.conf = conf; + this.pigContext = pigContext; this.inited = true; } } @@ -77,7 +78,7 @@ public class TezResourceManager { } // Ship the local resource to the staging directory on the remote FS - if (uri.toString().startsWith("file:")) { + if (!pigContext.getExecType().isLocal() && uri.toString().startsWith("file:")) { Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName)); remoteFs.copyFromLocalFile(resourcePath, remoteFsPath); remoteFs.setReplication(remoteFsPath, (short)conf.getInt(Job.SUBMIT_REPLICATION, 3)); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Thu Nov 27 12:49:54 2014 @@ -29,8 +29,11 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig; import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.util.Utils; +import org.apache.pig.tools.pigstats.tez.TezScriptState; import org.apache.tez.client.TezAppMasterStatus; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.TezConfiguration; @@ -80,9 +83,13 @@ public class TezSessionManager { private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>(); private static SessionInfo createSession(Configuration conf, - Map<String, LocalResource> requestedAMResources, Credentials creds) - throws TezException, IOException, InterruptedException { + Map<String, LocalResource> requestedAMResources, Credentials creds, + TezJobConfig tezJobConf) throws TezException, IOException, + InterruptedException { TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf); + TezScriptState ss = TezScriptState.get(); + ss.addDAGSettingsToConf(amConf); + adjustAMConfig(amConf, tezJobConf); String jobName = conf.get(PigContext.JOB_NAME, "pig"); TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds); tezClient.start(); @@ -94,6 +101,56 @@ public class TezSessionManager { return new SessionInfo(tezClient, requestedAMResources); } + private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) { + int requiredAMMaxHeap = -1; + int requiredAMResourceMB = -1; + int configuredAMMaxHeap = Utils.extractHeapSizeInMB(amConf.get( + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT)); + int configuredAMResourceMB = amConf.getInt( + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT); + + if (tezJobConf.getEstimatedTotalParallelism() > 0) { + + int minAMMaxHeap = 3584; + int minAMResourceMB = 4096; + + // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb + // Increment by 512 mb for every additional 5K tasks. + for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) { + if (tezJobConf.getEstimatedTotalParallelism() > taskCount) { + requiredAMMaxHeap = minAMMaxHeap; + requiredAMResourceMB = minAMResourceMB; + break; + } + minAMMaxHeap = minAMMaxHeap - 512; + minAMResourceMB = minAMResourceMB - 512; + } + + if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) { + amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB); + log.info("Increasing " + + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from " + + configuredAMResourceMB + " to " + + requiredAMResourceMB + + " as the number of total estimated tasks is " + + tezJobConf.getEstimatedTotalParallelism()); + + if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) { + amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, + amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS) + + " -Xmx" + requiredAMMaxHeap + "M"); + log.info("Increasing Tez AM Heap Size from " + + configuredAMMaxHeap + "M to " + + requiredAMMaxHeap + + "M as the number of total estimated tasks is " + + tezJobConf.getEstimatedTotalParallelism()); + } + } + } + } + private static boolean validateSessionResources(SessionInfo currentSession, Map<String, LocalResource> requestedAMResources) throws TezException, IOException { @@ -106,7 +163,7 @@ public class TezSessionManager { } static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources, - Credentials creds) throws TezException, IOException, InterruptedException { + Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException { List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>(); SessionInfo newSession = null; sessionPoolLock.readLock().lock(); @@ -135,11 +192,12 @@ public class TezSessionManager { // We cannot find available AM, create new one // Create session outside of locks so that getClient/freeSession is not // blocked for parallel embedded pig runs - newSession = createSession(conf, requestedAMResources, creds); + newSession = createSession(conf, requestedAMResources, creds, tezJobConf); newSession.inUse = true; sessionPoolLock.writeLock().lock(); try { if (shutdown == true) { + log.info("Shutting down Tez session " + newSession.session); newSession.session.stop(); throw new IOException("TezSessionManager is shut down"); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Thu Nov 27 12:49:54 2014 @@ -26,6 +26,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @@ -35,7 +36,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.classification.InterfaceAudience; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; -import org.apache.tez.mapreduce.hadoop.MRJobConfig; @InterfaceAudience.Private public class MRToTezHelper { @@ -117,6 +117,12 @@ public class MRToTezHelper { dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, "" + amCores); + dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS, + tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); + + dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS, + tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); + dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, "" + dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS)); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Thu Nov 27 12:49:54 2014 @@ -1,7 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.pig.backend.hadoop.executionengine.tez.util; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import org.apache.pig.PigException; @@ -12,19 +30,21 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.tez.POLocalRearrangeTez; -import org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez; -import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez; -import org.apache.pig.backend.hadoop.executionengine.tez.RoundRobinPartitioner; -import org.apache.pig.backend.hadoop.executionengine.tez.TezEdgeDescriptor; -import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.RoundRobinPartitioner; import org.apache.pig.data.DataType; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanException; +import org.apache.pig.impl.plan.VisitorException; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.runtime.library.input.UnorderedKVInput; import org.apache.tez.runtime.library.output.UnorderedKVOutput; @@ -172,4 +192,19 @@ public class TezCompilerUtil { edge.setIntermediateOutputValueClass(TUPLE_CLASS); } + /** + * Returns true if there are no loads or stores in a TezOperator. + * To be called only after LoaderProcessor is called + */ + static public boolean isIntermediateReducer(TezOperator tezOper) throws VisitorException { + boolean intermediateReducer = false; + LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOper.plan, POStore.class); + // Not map and not final reducer + if (stores.size() <= 0 && + (tezOper.getLoaderInfo().getLoads() == null || tezOper.getLoaderInfo().getLoads().size() <= 0)) { + intermediateReducer = true; + } + return intermediateReducer; + } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Thu Nov 27 12:49:54 2014 @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.pig.backend.hadoop.executionengine.util; import java.util.List; @@ -5,6 +22,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.Accumulator; +import org.apache.pig.PigConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; @@ -27,6 +46,17 @@ import org.apache.pig.impl.PigContext; public class AccumulatorOptimizerUtil { private static final Log LOG = LogFactory.getLog(AccumulatorOptimizerUtil.class); + public static int getAccumulativeBatchSize() { + int batchSize = 20000; + if (PigMapReduce.sJobConfInternal.get() != null) { + String size = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_ACCUMULATIVE_BATCHSIZE); + if (size != null) { + batchSize = Integer.parseInt(size); + } + } + return batchSize; + } + public static void addAccumulator(PhysicalPlan plan) { // See if this is a map-reduce job List<PhysicalOperator> pos = plan.getRoots(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Thu Nov 27 12:49:54 2014 @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.pig.backend.hadoop.executionengine.util; import java.util.ArrayList; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Thu Nov 27 12:49:54 2014 @@ -293,11 +293,25 @@ public class MapRedUtil { } }; + public static long getPathLength(FileSystem fs, FileStatus status) + throws IOException{ + return getPathLength(fs, status, Long.MAX_VALUE); + } + /** - * Returns the total number of bytes for this file, - * or if a directory all files in the directory. + * Returns the total number of bytes for this file, or if a directory all + * files in the directory. + * + * @param fs FileSystem + * @param status FileStatus + * @param max Maximum value of total length that will trigger exit. Many + * times we're only interested whether the total length of files is greater + * than X or not. In such case, we can exit the function early as soon as + * the max is reached. + * @return + * @throws IOException */ - public static long getPathLength(FileSystem fs, FileStatus status) + public static long getPathLength(FileSystem fs, FileStatus status, long max) throws IOException { if (!status.isDir()) { return status.getLen(); @@ -306,7 +320,8 @@ public class MapRedUtil { status.getPath(), hiddenFileFilter); long size = 0; for (FileStatus child : children) { - size += getPathLength(fs, child); + size += getPathLength(fs, child, max); + if (size > max) return size; } return size; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java Thu Nov 27 12:49:54 2014 @@ -38,18 +38,16 @@ public class ParallelConstantVisitor ext @Override public void visitConstant(ConstantExpression cnst) throws VisitorException { - if (cnst.getRequestedParallelism() == -1) { - Object obj = cnst.getValue(); - if (obj instanceof Integer) { - if (replaced) { - // sample job should have only one ConstantExpression - throw new VisitorException("Invalid reduce plan: more " + - "than one ConstantExpression found in sampling job"); - } - cnst.setValue(rp); - cnst.setRequestedParallelism(rp); - replaced = true; + Object obj = cnst.getValue(); + if (obj instanceof Integer) { + if (replaced) { + // sample job should have only one ConstantExpression + throw new VisitorException("Invalid reduce plan: more " + + "than one ConstantExpression found in sampling job"); } + cnst.setValue(rp); + cnst.setRequestedParallelism(rp); + replaced = true; } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseBinaryConverter.java Thu Nov 27 12:49:54 2014 @@ -93,7 +93,7 @@ public class HBaseBinaryConverter implem @Override public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema fieldSchema) throws IOException { - return bytesToMap(b, fieldSchema); + throw new ExecException("Can't generate a Map from byte[]"); } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Thu Nov 27 12:49:54 2014 @@ -24,6 +24,8 @@ import java.lang.reflect.Method; import java.lang.reflect.UndeclaredThrowableException; import java.math.BigDecimal; import java.math.BigInteger; +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -74,6 +76,7 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.pig.CollectableLoadFunc; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; import org.apache.pig.LoadPushDown; @@ -82,8 +85,10 @@ import org.apache.pig.OrderedLoadFunc; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.StoreFuncInterface; +import org.apache.pig.StoreResources; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder; +import org.apache.pig.builtin.FuncUtils; import org.apache.pig.builtin.Utf8StorageConverter; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -135,7 +140,8 @@ import com.google.common.collect.Lists; * <code>buddies</code> column family in the <code>SampleTableCopy</code> table. * */ -public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc { +public class HBaseStorage extends LoadFunc implements StoreFuncInterface, LoadPushDown, OrderedLoadFunc, StoreResources, + CollectableLoadFunc { private static final Log LOG = LogFactory.getLog(HBaseStorage.class); @@ -317,7 +323,7 @@ public class HBaseStorage extends LoadFu if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null) {//the empty string and null check is for backward compat. noWAL_ = true; } - } + } if (configuredOptions_.hasOption("minTimestamp")){ minTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("minTimestamp")); @@ -719,7 +725,6 @@ public class HBaseStorage extends LoadFu Properties udfProps = getUDFProperties(); job.getConfiguration().setBoolean("pig.noSplitCombination", true); - initializeHBaseClassLoaderResources(job); m_conf = initializeLocalJobConfig(job); String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET); if (delegationTokenSet == null) { @@ -748,14 +753,23 @@ public class HBaseStorage extends LoadFu } } - private void initializeHBaseClassLoaderResources(Job job) throws IOException { + @Override + public List<String> getShipFiles() { // Depend on HBase to do the right thing when available, as of HBASE-9165 try { Method addHBaseDependencyJars = TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class); if (addHBaseDependencyJars != null) { - addHBaseDependencyJars.invoke(null, job.getConfiguration()); - return; + Configuration conf = new Configuration(); + addHBaseDependencyJars.invoke(null, conf); + if (conf.get("tmpjars") != null) { + String[] tmpjars = conf.getStrings("tmpjars"); + List<String> shipFiles = new ArrayList<String>(tmpjars.length); + for (String tmpjar : tmpjars) { + shipFiles.add(new URL(tmpjar).getPath()); + } + return shipFiles; + } } } catch (NoSuchMethodException e) { LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available." @@ -766,32 +780,32 @@ public class HBaseStorage extends LoadFu } catch (InvocationTargetException e) { LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" + " failed. Falling back to previous logic.", e); + } catch (MalformedURLException e) { + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars" + + " had malformed url. Falling back to previous logic.", e); } - // fall back to manual class handling. - // Make sure the HBase, ZooKeeper, and Guava jars get shipped. - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - org.apache.hadoop.hbase.client.HTable.class, // main hbase jar or hbase-client - org.apache.hadoop.hbase.mapreduce.TableSplit.class, // main hbase jar or hbase-server - com.google.common.collect.Lists.class, // guava - org.apache.zookeeper.ZooKeeper.class); // zookeeper + + List<Class> classList = new ArrayList<Class>(); + classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client + classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server + classList.add(com.google.common.collect.Lists.class); // guava + classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper // Additional jars that are specific to v0.95.0+ - addClassToJobIfExists(job, "org.cloudera.htrace.Trace"); // htrace - addClassToJobIfExists(job, "org.apache.hadoop.hbase.protobuf.generated.HBaseProtos"); // hbase-protocol - addClassToJobIfExists(job, "org.apache.hadoop.hbase.TableName"); // hbase-common - addClassToJobIfExists(job, "org.apache.hadoop.hbase.CompatibilityFactory"); // hbase-hadoop-compar - addClassToJobIfExists(job, "org.jboss.netty.channel.ChannelFactory"); // netty - } - - private void addClassToJobIfExists(Job job, String className) throws IOException { - Class klass = null; - try { - klass = Class.forName(className); - } catch (ClassNotFoundException e) { - LOG.debug("Skipping adding jar for class: " + className); - return; - } + addClassToList("org.cloudera.htrace.Trace", classList); // htrace + addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol + addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common + addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar + addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty + return FuncUtils.getShipFiles(classList); + } - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), klass); + private void addClassToList(String className, List<Class> classList) { + try { + Class klass = Class.forName(className); + classList.add(klass); + } catch (ClassNotFoundException e) { + LOG.debug("Skipping adding jar for class: " + className); + } } private JobConf initializeLocalJobConfig(Job job) { @@ -940,21 +954,25 @@ public class HBaseStorage extends LoadFu DataType.findType(t.get(i)) : fieldSchemas[i].getType())); } else { Map<String, Object> cfMap = (Map<String, Object>) t.get(i); - for (String colName : cfMap.keySet()) { - if (LOG.isDebugEnabled()) { - LOG.debug("putNext - colName=" + colName + - ", class: " + colName.getClass()); + if (cfMap!=null) { + for (String colName : cfMap.keySet()) { + if (LOG.isDebugEnabled()) { + LOG.debug("putNext - colName=" + colName + + ", class: " + colName.getClass()); + } + // TODO deal with the fact that maps can have types now. Currently we detect types at + // runtime in the case of storing to a cf, which is suboptimal. + put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts, + objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName)))); } - // TODO deal with the fact that maps can have types now. Currently we detect types at - // runtime in the case of storing to a cf, which is suboptimal. - put.add(columnInfo.getColumnFamily(), Bytes.toBytes(colName.toString()), ts, - objToBytes(cfMap.get(colName), DataType.findType(cfMap.get(colName)))); } } } try { - writer.write(null, put); + if (!put.isEmpty()) { + writer.write(null, put); + } } catch (InterruptedException e) { throw new IOException(e); } @@ -1031,7 +1049,6 @@ public class HBaseStorage extends LoadFu schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema); } - initializeHBaseClassLoaderResources(job); m_conf = initializeLocalJobConfig(job); // Not setting a udf property and getting the hbase delegation token // only once like in setLocation as setStoreLocation gets different Job @@ -1115,28 +1132,24 @@ public class HBaseStorage extends LoadFu return new RequiredFieldResponse(true); } - @Override - public WritableComparable<InputSplit> getSplitComparable(InputSplit split) - throws IOException { - return new WritableComparable<InputSplit>() { - TableSplit tsplit = new TableSplit(); - - @Override - public void readFields(DataInput in) throws IOException { - tsplit.readFields(in); - } - - @Override - public void write(DataOutput out) throws IOException { - tsplit.write(out); - } + public void ensureAllKeyInstancesInSameSplit() throws IOException { + /** + * no-op because hbase keys are unique + * This will also work with things like DelimitedKeyPrefixRegionSplitPolicy + * if you need a partial key match to be included in the split + */ + LOG.debug("ensureAllKeyInstancesInSameSplit"); + } - @Override - public int compareTo(InputSplit split) { - return tsplit.compareTo((TableSplit) split); - } - }; + @Override + public WritableComparable<TableSplit> getSplitComparable(InputSplit split) throws IOException { + if (split instanceof TableSplit) { + return new TableSplitComparable((TableSplit) split); + } else { + throw new RuntimeException("LoadFunc expected split of type TableSplit but was " + split.getClass().getName()); + } } + /** * Class to encapsulate logic around which column names were specified in each Modified: pig/branches/spark/src/org/apache/pig/builtin/ABS.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/ABS.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/ABS.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/ABS.java Thu Nov 27 12:49:54 2014 @@ -42,7 +42,7 @@ public class ABS extends EvalFunc<Double * @return output returns a single numeric value, absolute value of the argument */ public Double exec(Tuple input) throws IOException { - if (input == null || input.size() == 0) + if (input == null || input.size() == 0 || input.get(0) == null) return null; Double d; Modified: pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/AvroStorage.java Thu Nov 27 12:49:54 2014 @@ -31,6 +31,7 @@ import org.apache.avro.Schema.Type; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericData; +import org.apache.avro.mapred.AvroInputFormat; import org.apache.avro.mapred.AvroOutputFormat; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -63,9 +64,11 @@ import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceStatistics; import org.apache.pig.StoreFunc; import org.apache.pig.StoreFuncInterface; +import org.apache.pig.StoreResources; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.JarManager; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; import org.apache.pig.impl.util.avro.AvroArrayReader; @@ -82,7 +85,7 @@ import com.google.common.collect.Maps; * */ public class AvroStorage extends LoadFunc - implements StoreFuncInterface, LoadMetadata, LoadPushDown { + implements StoreFuncInterface, LoadMetadata, LoadPushDown, StoreResources { /** * Creates new instance of Pig Storage function, without specifying @@ -593,7 +596,11 @@ public class AvroStorage extends LoadFun } else { rr = new AvroRecordReader(s); } - rr.initialize(is, tc); + try { + rr.initialize(is, tc); + } finally { + rr.close(); + } tc.setStatus(is.toString()); return rr; } @@ -674,4 +681,9 @@ public class AvroStorage extends LoadFun } + @Override + public List<String> getShipFiles() { + Class[] classList = new Class[] {Schema.class, AvroInputFormat.class}; + return FuncUtils.getShipFiles(classList); + } } Modified: pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/BigDecimalAbs.java Thu Nov 27 12:49:54 2014 @@ -27,6 +27,8 @@ import org.apache.pig.data.Tuple; public class BigDecimalAbs extends EvalFunc<BigDecimal> { @Override public BigDecimal exec(Tuple input) throws IOException { + if (input == null || input.size() == 0 || input.get(0) == null) + return null; return ((BigDecimal)input.get(0)).abs(); } Modified: pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/BigIntegerAbs.java Thu Nov 27 12:49:54 2014 @@ -27,6 +27,8 @@ import org.apache.pig.data.Tuple; public class BigIntegerAbs extends EvalFunc<BigInteger> { @Override public BigInteger exec(Tuple input) throws IOException { + if (input == null || input.size() == 0 || input.get(0) == null) + return null; return ((BigInteger)input.get(0)).abs(); } Modified: pig/branches/spark/src/org/apache/pig/builtin/Bloom.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Bloom.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/Bloom.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/Bloom.java Thu Nov 27 12:49:54 2014 @@ -21,7 +21,9 @@ package org.apache.pig.builtin; import java.io.ByteArrayInputStream; import java.io.DataInputStream; +import java.io.File; import java.io.FileInputStream; +import java.io.FilenameFilter; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -29,7 +31,6 @@ import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; - import org.apache.pig.FilterFunc; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; @@ -94,9 +95,22 @@ public class Bloom extends FilterFunc { private void init() throws IOException { filter = new BloomFilter(); - String dcFile = "./" + getFilenameFromPath(bloomFile) + - "/part-r-00000"; - filter.readFields(new DataInputStream(new FileInputStream(dcFile))); + String dir = "./" + getFilenameFromPath(bloomFile); + String[] partFiles = new File(dir) + .list(new FilenameFilter() { + @Override + public boolean accept(File current, String name) { + return name.startsWith("part"); + } + }); + + String dcFile = dir + "/" + partFiles[0]; + DataInputStream dis = new DataInputStream(new FileInputStream(dcFile)); + try { + filter.readFields(dis); + } finally { + dis.close(); + } } /** Modified: pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/BuildBloom.java Thu Nov 27 12:49:54 2014 @@ -39,15 +39,15 @@ import org.apache.pig.impl.logicalLayer. * define bb BuildBloom('jenkins', '100', '0.1'); * A = load 'foo' as (x, y); * B = group A all; - * C = foreach B generate BuildBloom(A.x); + * C = foreach B generate bb(A.x); * store C into 'mybloom'; * The bloom filter can be on multiple keys by passing more than one field * (or the entire bag) to BuildBloom. * The resulting file can then be used in a Bloom filter as: - * define bloom Bloom(mybloom); + * define bloom Bloom('mybloom'); * A = load 'foo' as (x, y); * B = load 'bar' as (z); - * C = filter B by Bloom(z); + * C = filter B by bloom(z); * D = join C by z, A by x; * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}. */ Modified: pig/branches/spark/src/org/apache/pig/builtin/Distinct.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Distinct.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/Distinct.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/Distinct.java Thu Nov 27 12:49:54 2014 @@ -22,6 +22,9 @@ import java.io.IOException; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; +import org.apache.pig.JVMReuseManager; +import org.apache.pig.PigConfiguration; +import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.data.BagFactory; @@ -38,35 +41,35 @@ import org.apache.pig.data.TupleFactory; */ public class Distinct extends EvalFunc<DataBag> implements Algebraic { - private static BagFactory bagFactory = BagFactory.getInstance(); private static TupleFactory tupleFactory = TupleFactory.getInstance(); - /* (non-Javadoc) - * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple) - */ + private static boolean initialized = false; + private static boolean useDefaultBag = false; + + static { + JVMReuseManager.getInstance().registerForStaticDataCleanup(Distinct.class); + } + + @StaticDataCleanup + public static void staticDataCleanup() { + initialized = false; + useDefaultBag = false; + } + @Override public DataBag exec(Tuple input) throws IOException { return getDistinct(input); } - /* (non-Javadoc) - * @see org.apache.pig.Algebraic#getFinal() - */ @Override public String getFinal() { return Final.class.getName(); } - /* (non-Javadoc) - * @see org.apache.pig.Algebraic#getInitial() - */ @Override public String getInitial() { return Initial.class.getName(); } - /* (non-Javadoc) - * @see org.apache.pig.Algebraic#getIntermed() - */ @Override public String getIntermed() { return Intermediate.class.getName(); @@ -74,13 +77,10 @@ public class Distinct extends EvalFunc< static public class Initial extends EvalFunc<Tuple> { - /* (non-Javadoc) - * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple) - */ @Override public Tuple exec(Tuple input) throws IOException { // the input has a single field which is a tuple - // representing the data we want to distinct. + // representing the data we want to distinct. // unwrap, put in a bag and send down try { Tuple single = (Tuple)input.get(0); @@ -94,9 +94,6 @@ public class Distinct extends EvalFunc< static public class Intermediate extends EvalFunc<Tuple> { - /* (non-Javadoc) - * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple) - */ @Override public Tuple exec(Tuple input) throws IOException { return tupleFactory.newTuple(getDistinctFromNestedBags(input, this)); @@ -105,30 +102,27 @@ public class Distinct extends EvalFunc< static public class Final extends EvalFunc<DataBag> { - /* (non-Javadoc) - * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple) - */ @Override public DataBag exec(Tuple input) throws IOException { return getDistinctFromNestedBags(input, this); } } - - static private DataBag createDataBag() { - // by default, we create InternalSortedBag, unless user configures - // explicitly to use old bag - String bagType = null; - if (PigMapReduce.sJobConfInternal.get() != null) { - bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type"); - } - - if (bagType != null && bagType.equalsIgnoreCase("default")) { - return BagFactory.getInstance().newDistinctBag(); - } else { - return new InternalDistinctBag(3); - } + + private static DataBag createDataBag() { + if (!initialized) { + initialized = true; + if (PigMapReduce.sJobConfInternal.get() != null) { + String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_DISTINCT_TYPE); + if (bagType != null && bagType.equalsIgnoreCase("default")) { + useDefaultBag = true; + } + } + } + // by default, we create InternalDistinctBag, unless user configures + // explicitly to use old bag + return useDefaultBag ? BagFactory.getInstance().newDistinctBag() : new InternalDistinctBag(3); } - + static private DataBag getDistinctFromNestedBags(Tuple input, EvalFunc evalFunc) throws IOException { DataBag result = createDataBag(); long progressCounter = 0; @@ -144,7 +138,7 @@ public class Distinct extends EvalFunc< for (Tuple t : (DataBag)tuple.get(0)) { result.add(t); ++progressCounter; - if((progressCounter % 1000) == 0){ + if((progressCounter % 1000) == 0){ evalFunc.progress(); } } @@ -154,7 +148,7 @@ public class Distinct extends EvalFunc< } return result; } - + protected DataBag getDistinct(Tuple input) throws IOException { try { DataBag inputBg = (DataBag)input.get(0); Modified: pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/DoubleRound.java Thu Nov 27 12:49:54 2014 @@ -37,7 +37,7 @@ public class DoubleRound extends EvalFun */ @Override public Long exec(Tuple input) throws IOException { - if (input == null || input.size() == 0) + if (input == null || input.size() == 0 || input.get(0) == null) return null; try{ Modified: pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/FloatRound.java Thu Nov 27 12:49:54 2014 @@ -39,7 +39,7 @@ public class FloatRound extends EvalFunc */ @Override public Integer exec(Tuple input) throws IOException { - if (input == null || input.size() == 0) + if (input == null || input.size() == 0 || input.get(0) == null) return null; try{ Modified: pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java Thu Nov 27 12:49:54 2014 @@ -19,28 +19,26 @@ package org.apache.pig.builtin; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.math.BigDecimal; -import java.math.BigInteger; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.joda.time.format.ISODateTimeFormat; import org.joda.time.format.DateTimeFormatter; - import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParseException; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.JsonToken; - import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; - import org.apache.pig.Expression; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; @@ -56,6 +54,7 @@ import org.apache.pig.data.DataByteArray import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.util.JarManager; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; import org.apache.pig.parser.ParserException; @@ -153,23 +152,31 @@ public class JsonLoader extends LoadFunc // isn't what we expect we return a tuple with null fields rather than // throwing an exception. That way a few mangled lines don't fail the // job. - if (p.nextToken() != JsonToken.START_OBJECT) { - warn("Bad record, could not find start of record " + - val.toString(), PigWarning.UDF_WARNING_1); - return t; - } - - // Read each field in the record - for (int i = 0; i < fields.length; i++) { - t.set(i, readField(p, fields[i], i)); - } - - if (p.nextToken() != JsonToken.END_OBJECT) { - warn("Bad record, could not find end of record " + - val.toString(), PigWarning.UDF_WARNING_1); - return t; + + try { + if (p.nextToken() != JsonToken.START_OBJECT) { + warn("Bad record, could not find start of record " + + val.toString(), PigWarning.UDF_WARNING_1); + return t; + } + + // Read each field in the record + for (int i = 0; i < fields.length; i++) { + t.set(i, readField(p, fields[i], i)); + } + + if (p.nextToken() != JsonToken.END_OBJECT) { + warn("Bad record, could not find end of record " + + val.toString(), PigWarning.UDF_WARNING_1); + return t; + } + + } catch (JsonParseException jpe) { + warn("Bad record, returning null for " + val, PigWarning.UDF_WARNING_1); + } finally { + p.close(); } - p.close(); + return t; } @@ -242,7 +249,7 @@ public class JsonLoader extends LoadFunc case DataType.BIGDECIMAL: tok = p.nextToken(); if (tok == JsonToken.VALUE_NULL) return null; - return p.getDecimalValue(); + return new BigDecimal(p.getText()); case DataType.MAP: // Should be a start of the map object @@ -372,4 +379,11 @@ public class JsonLoader extends LoadFunc throws IOException { // We don't have partitions } + + @Override + public List<String> getShipFiles() { + List<String> cacheFiles = new ArrayList<String>(); + Class[] classList = new Class[] {JsonFactory.class}; + return FuncUtils.getShipFiles(classList); + } } Modified: pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/JsonStorage.java Thu Nov 27 12:49:54 2014 @@ -19,17 +19,15 @@ package org.apache.pig.builtin; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Properties; import java.math.BigDecimal; import java.math.BigInteger; -import org.joda.time.DateTime; - import org.codehaus.jackson.JsonEncoding; import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -38,25 +36,18 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; - import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.ResourceStatistics; import org.apache.pig.StoreMetadata; import org.apache.pig.StoreFunc; +import org.apache.pig.StoreResources; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.data.DataBag; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeMap; - /** * A JSON Pig store function. Each Pig tuple is stored on one line (as one * value for TextOutputFormat) so that it can be read easily using @@ -66,7 +57,7 @@ import java.util.TreeMap; * with mapping between JSON and Pig types. The schema file share the same format * as the one we use in PigStorage. */ -public class JsonStorage extends StoreFunc implements StoreMetadata { +public class JsonStorage extends StoreFunc implements StoreMetadata, StoreResources { protected RecordWriter writer = null; protected ResourceSchema schema = null; @@ -318,4 +309,14 @@ public class JsonStorage extends StoreFu return s; } + @Override + public List<String> getShipFiles() { + Class[] classList = new Class[] {JsonFactory.class}; + return FuncUtils.getShipFiles(classList); + } + + @Override + public List<String> getCacheFiles() { + return null; + } } Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Thu Nov 27 12:49:54 2014 @@ -20,11 +20,11 @@ package org.apache.pig.builtin; import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; -import java.util.concurrent.TimeUnit; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -37,7 +37,6 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.orc.CompressionKind; import org.apache.hadoop.hive.ql.io.orc.OrcFile; @@ -50,12 +49,13 @@ import org.apache.hadoop.hive.ql.io.orc. import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; @@ -82,6 +82,7 @@ import org.apache.pig.Expression.BinaryE import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.ResourceStatistics; import org.apache.pig.StoreFuncInterface; +import org.apache.pig.StoreResources; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.data.DataType; @@ -93,6 +94,7 @@ import org.apache.pig.impl.util.Utils; import org.apache.pig.impl.util.orc.OrcUtils; import org.joda.time.DateTime; +import com.esotericsoftware.kryo.io.Input; import com.google.common.annotations.VisibleForTesting; /** @@ -111,7 +113,7 @@ import com.google.common.annotations.Vis * <li><code>-v, --version</code> Sets the version of the file that will be written * </ul> **/ -public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown { +public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown, StoreResources { //TODO Make OrcInputFormat.SARG_PUSHDOWN visible private static final String SARG_PUSHDOWN = "sarg.pushdown"; @@ -384,6 +386,26 @@ public class OrcStorage extends LoadFunc } } + @Override + public List<String> getShipFiles() { + List<String> cacheFiles = new ArrayList<String>(); + String hadoopVersion = "20S"; + if (Utils.isHadoop23() || Utils.isHadoop2()) { + hadoopVersion = "23"; + } + Class hadoopVersionShimsClass; + try { + hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" + + hadoopVersion + "Shims"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath"); + } + Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class, + org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass, + Input.class}; + return FuncUtils.getShipFiles(classList); + } + private static Path getFirstFile(String location, FileSystem fs) throws IOException { String[] locations = getPathStrings(location); Path[] paths = new Path[locations.length]; @@ -498,8 +520,6 @@ public class OrcStorage extends LoadFunc for (ResourceFieldSchema field : schema.getFields()) { switch(field.getType()) { case DataType.BOOLEAN: - // TODO: ORC does not seem to support it - break; case DataType.INTEGER: case DataType.LONG: case DataType.FLOAT: @@ -671,14 +691,12 @@ public class OrcStorage extends LoadFunc } private Object getSearchArgObjValue(Object value) { - // TODO Test BigInteger, BigInteger and DateTime if (value instanceof BigInteger) { - return HiveDecimal.create(((BigInteger)value)); + return new BigDecimal((BigInteger)value); } else if (value instanceof BigDecimal) { - return HiveDecimal.create(((BigDecimal)value), false); + return value; } else if (value instanceof DateTime) { - //TODO is this right based on what DateTimeWritable.dateToDays() does? What about pig.datetime.default.tz? - return new DateWritable((int)(((DateTime)value).getMillis() / TimeUnit.DAYS.toMillis(1))); + return new Timestamp(((DateTime)value).getMillis()); } else { return value; } Modified: pig/branches/spark/src/org/apache/pig/builtin/ROUND.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/ROUND.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/ROUND.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/ROUND.java Thu Nov 27 12:49:54 2014 @@ -44,7 +44,7 @@ public class ROUND extends EvalFunc<Long */ @Override public Long exec(Tuple input) throws IOException { - if (input == null || input.size() == 0) + if (input == null || input.size() == 0 || input.get(0) == null) return null; try{ Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Thu Nov 27 12:49:54 2014 @@ -148,7 +148,9 @@ public class TextLoader extends LoadFunc @Override public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema schema) throws IOException { - return bytesToMap(b, schema); + int errCode = 2109; + String msg = "TextLoader does not support conversion to Map."; + throw new ExecException(msg, errCode, PigException.BUG); } /**
