Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Wed Feb 22 09:43:41 2017 @@ -22,6 +22,7 @@ import java.io.PrintStream; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -166,7 +167,7 @@ public class TezLauncher extends Launche tezStats = new TezPigScriptStats(pc); PigStats.start(tezStats); - conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true"); + conf.setIfUnset(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true"); TezJobCompiler jc = new TezJobCompiler(pc, conf); TezPlanContainer tezPlanContainer = compile(php, pc); @@ -174,6 +175,10 @@ public class TezLauncher extends Launche tezScriptState.emitInitialPlanNotification(tezPlanContainer); tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch + boolean stop_on_failure = + Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false")); + boolean stoppedOnFailure = false; + TezPlanContainerNode tezPlanContainerNode; TezOperPlan tezPlan; int processedDAGs = 0; @@ -252,7 +257,18 @@ public class TezLauncher extends Launche ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100); } handleUnCaughtException(pc); - tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed()); + boolean tezDAGSucceeded = reporter.notifyFinishedOrFailed(); + tezPlanContainer.updatePlan(tezPlan, tezDAGSucceeded); + // if stop_on_failure is enabled, we need to stop immediately when any job has failed + if (!tezDAGSucceeded) { + if (stop_on_failure) { + stoppedOnFailure = true; + break; + } else { + log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you " + + "want Pig to stop immediately on failure."); + } + } } tezStats.finish(); @@ -279,6 +295,11 @@ public class TezLauncher extends Launche } } + if (stoppedOnFailure) { + throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017, + PigException.REMOTE_ENVIRONMENT); + } + return tezStats; } @@ -402,9 +423,11 @@ public class TezLauncher extends Launche TezCompiler comp = new TezCompiler(php, pc); comp.compile(); TezPlanContainer planContainer = comp.getPlanContainer(); - for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer - .getKeys().entrySet()) { - TezOperPlan tezPlan = entry.getValue().getTezOperPlan(); + // Doing a sort so that test plan printed remains same between jdk7 and jdk8 + List<OperatorKey> opKeys = new ArrayList<>(planContainer.getKeys().keySet()); + Collections.sort(opKeys); + for (OperatorKey opKey : opKeys) { + TezOperPlan tezPlan = planContainer.getOperator(opKey).getTezOperPlan(); optimize(tezPlan, pc); } return planContainer; @@ -499,7 +522,7 @@ public class TezLauncher extends Launche @Override public void killJob(String jobID, Configuration conf) throws BackendException { - if (runningJob != null && runningJob.getApplicationId().toString() == jobID) { + if (runningJob != null && runningJob.getApplicationId().toString().equals(jobID)) { try { runningJob.killJob(); } catch (Exception e) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Wed Feb 22 09:43:41 2017 @@ -39,6 +39,8 @@ import org.apache.pig.PigConfiguration; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; +import com.google.common.annotations.VisibleForTesting; + public class TezResourceManager { private static TezResourceManager instance = null; private boolean inited = false; @@ -59,6 +61,7 @@ public class TezResourceManager { /** * This method is only used by test code to reset state. */ + @VisibleForTesting public static void dropInstance() { instance = null; } @@ -66,7 +69,7 @@ public class TezResourceManager { public void init(PigContext pigContext, Configuration conf) throws IOException { if (!inited) { this.resourcesDir = FileLocalizer.getTemporaryResourcePath(pigContext); - this.remoteFs = FileSystem.get(conf); + this.remoteFs = resourcesDir.getFileSystem(conf); this.conf = conf; this.pigContext = pigContext; this.inited = true; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Wed Feb 22 09:43:41 2017 @@ -18,7 +18,9 @@ package org.apache.pig.backend.hadoop.executionengine.tez; import java.io.IOException; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Calendar; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -29,9 +31,11 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.pig.PigConfiguration; import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig; import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.tez.TezScriptState; import org.apache.tez.client.TezAppMasterStatus; @@ -46,13 +50,13 @@ public class TezSessionManager { private static final Log log = LogFactory.getLog(TezSessionManager.class); static { - Runtime.getRuntime().addShutdownHook(new Thread() { + Utils.addShutdownHookWithPriority(new Runnable() { @Override public void run() { TezSessionManager.shutdown(); } - }); + }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY); } private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock(); @@ -61,11 +65,17 @@ public class TezSessionManager { private TezSessionManager() { } - public static class SessionInfo { - SessionInfo(TezClient session, Map<String, LocalResource> resources) { + private static class SessionInfo { + + public SessionInfo(TezClient session, TezConfiguration config, Map<String, LocalResource> resources) { this.session = session; + this.config = config; this.resources = resources; } + + public TezConfiguration getConfig() { + return config; + } public Map<String, LocalResource> getResources() { return resources; } @@ -77,20 +87,23 @@ public class TezSessionManager { } private TezClient session; private Map<String, LocalResource> resources; + private TezConfiguration config; private boolean inUse = false; } private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>(); - private static SessionInfo createSession(Configuration conf, + private static SessionInfo createSession(TezConfiguration amConf, Map<String, LocalResource> requestedAMResources, Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException { - TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf); + MRToTezHelper.translateMRSettingsForTezAM(amConf); TezScriptState ss = TezScriptState.get(); ss.addDAGSettingsToConf(amConf); - adjustAMConfig(amConf, tezJobConf); - String jobName = conf.get(PigContext.JOB_NAME, "pig"); + if (amConf.getBoolean(PigConfiguration.PIG_TEZ_CONFIGURE_AM_MEMORY, true)) { + adjustAMConfig(amConf, tezJobConf); + } + String jobName = amConf.get(PigContext.JOB_NAME, "pig"); TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds); try { tezClient.start(); @@ -104,12 +117,10 @@ public class TezSessionManager { tezClient.stop(); throw new RuntimeException(e); } - return new SessionInfo(tezClient, requestedAMResources); + return new SessionInfo(tezClient, amConf, requestedAMResources); } private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) { - int requiredAMMaxHeap = -1; - int requiredAMResourceMB = -1; String amLaunchOpts = amConf.get( TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT); @@ -122,8 +133,10 @@ public class TezSessionManager { // Need more room for native memory/virtual address space // when close to 4G due to 32-bit jvm 4G limit - int minAMMaxHeap = 3200; - int minAMResourceMB = 4096; + int maxAMHeap = Utils.is64bitJVM() ? 3584 : 3200; + int maxAMResourceMB = 4096; + int requiredAMResourceMB = maxAMResourceMB; + int requiredAMMaxHeap = maxAMHeap; // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb // Increment container size by 512 mb for every additional 5K tasks. @@ -135,22 +148,38 @@ public class TezSessionManager { // 5000 and above - 1024Xmx, 1536 (512 native memory) for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) { if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) { - requiredAMMaxHeap = minAMMaxHeap; - requiredAMResourceMB = minAMResourceMB; break; } - minAMResourceMB = minAMResourceMB - 512; - minAMMaxHeap = minAMResourceMB - 512; + requiredAMResourceMB = requiredAMResourceMB - 512; + requiredAMMaxHeap = requiredAMResourceMB - 512; + } + + if (tezJobConf.getTotalVertices() > 30) { + //Add 512 mb per 30 vertices + int additionaMem = 512 * (tezJobConf.getTotalVertices() / 30); + requiredAMResourceMB = requiredAMResourceMB + additionaMem; + requiredAMMaxHeap = requiredAMResourceMB - 512; + } + + if (tezJobConf.getMaxOutputsinSingleVertex() > 10) { + //Add 256 mb per 5 outputs if a vertex has more than 10 outputs + int additionaMem = 256 * (tezJobConf.getMaxOutputsinSingleVertex() / 5); + requiredAMResourceMB = requiredAMResourceMB + additionaMem; + requiredAMMaxHeap = requiredAMResourceMB - 512; } + requiredAMResourceMB = Math.min(maxAMResourceMB, requiredAMResourceMB); + requiredAMMaxHeap = Math.min(maxAMHeap, requiredAMMaxHeap); + if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) { amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB); log.info("Increasing " + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from " + configuredAMResourceMB + " to " + requiredAMResourceMB - + " as the number of total estimated tasks is " - + tezJobConf.getEstimatedTotalParallelism()); + + " as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism() + + ", total vertices = " + tezJobConf.getTotalVertices() + + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex()); if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) { amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, @@ -158,8 +187,9 @@ public class TezSessionManager { log.info("Increasing Tez AM Heap Size from " + configuredAMMaxHeap + "M to " + requiredAMMaxHeap - + "M as the number of total estimated tasks is " - + tezJobConf.getEstimatedTotalParallelism()); + + "M as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism() + + ", total vertices = " + tezJobConf.getTotalVertices() + + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex()); log.info("Value of " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now " + amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS)); } @@ -178,7 +208,22 @@ public class TezSessionManager { return true; } - static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources, + private static boolean validateSessionConfig(SessionInfo currentSession, + Configuration newSessionConfig) + throws TezException, IOException { + // If DAG recovery is disabled for one and enabled for another, do not reuse + if (currentSession.getConfig().getBoolean( + TezConfiguration.DAG_RECOVERY_ENABLED, + TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT) + != newSessionConfig.getBoolean( + TezConfiguration.DAG_RECOVERY_ENABLED, + TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) { + return false; + } + return true; + } + + static TezClient getClient(TezConfiguration conf, Map<String, LocalResource> requestedAMResources, Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException { List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>(); SessionInfo newSession = null; @@ -196,7 +241,8 @@ public class TezSessionManager { sessionsToRemove.add(sessionInfo); } else if (!sessionInfo.inUse && appMasterStatus.equals(TezAppMasterStatus.READY) - && validateSessionResources(sessionInfo,requestedAMResources)) { + && validateSessionResources(sessionInfo,requestedAMResources) + && validateSessionConfig(sessionInfo, conf)) { sessionInfo.inUse = true; return sessionInfo.session; } @@ -253,6 +299,11 @@ public class TezSessionManager { synchronized (sessionInfo) { if (sessionInfo.session == session) { log.info("Stopping Tez session " + session); + String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(Calendar.getInstance().getTime()); + System.err.println(timeStamp + " Shutting down Tez session " + + ", sessionName=" + session.getClientName() + + ", applicationId=" + session.getAppMasterApplicationId()); session.stop(); sessionToRemove = sessionInfo; break; @@ -279,19 +330,30 @@ public class TezSessionManager { shutdown = true; for (SessionInfo sessionInfo : sessionPool) { synchronized (sessionInfo) { + TezClient session = sessionInfo.session; try { - if (sessionInfo.session.getAppMasterStatus().equals( + String timeStamp = new SimpleDateFormat( + "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); + if (session.getAppMasterStatus().equals( TezAppMasterStatus.SHUTDOWN)) { log.info("Tez session is already shutdown " - + sessionInfo.session); + + session); + System.err.println(timeStamp + + " Tez session is already shutdown " + session + + ", sessionName=" + session.getClientName() + + ", applicationId=" + session.getAppMasterApplicationId()); continue; } - log.info("Shutting down Tez session " - + sessionInfo.session); - sessionInfo.session.stop(); + log.info("Shutting down Tez session " + session); + // Since hadoop calls org.apache.log4j.LogManager.shutdown(); + // the log.info message is not displayed with shutdown hook in Oozie + System.err.println(timeStamp + " Shutting down Tez session " + + ", sessionName=" + session.getClientName() + + ", applicationId=" + session.getAppMasterApplicationId()); + session.stop(); } catch (Exception e) { log.error("Error shutting down Tez session " - + sessionInfo.session, e); + + session, e); } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Wed Feb 22 09:43:41 2017 @@ -32,10 +32,12 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.hash.Hash; import org.apache.pig.CollectableLoadFunc; import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; @@ -44,8 +46,10 @@ import org.apache.pig.OrderedLoadFunc; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -82,7 +86,10 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez; @@ -110,6 +117,7 @@ import org.apache.pig.impl.builtin.GetMe import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.io.NullableIntWritable; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.Operator; @@ -167,6 +175,10 @@ public class TezCompiler extends PhyPlan private Map<PhysicalOperator, TezOperator> phyToTezOpMap; + // Contains the inputs to operator like join, with the list maintaining the + // same order of join from left to right + private Map<TezOperator, List<TezOperator>> inputsMap; + public static final String USER_COMPARATOR_MARKER = "user.comparator.func:"; public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold"; public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation"; @@ -175,6 +187,8 @@ public class TezCompiler extends PhyPlan private boolean optimisticFileConcatenation = false; private List<String> readOnceLoadFuncs = null; + private Configuration conf; + private POLocalRearrangeTezFactory localRearrangeFactory; public TezCompiler(PhysicalPlan plan, PigContext pigContext) @@ -184,6 +198,7 @@ public class TezCompiler extends PhyPlan this.pigContext = pigContext; pigProperties = pigContext.getProperties(); + conf = ConfigurationUtil.toConfiguration(pigProperties, false); splitsSeen = Maps.newHashMap(); tezPlan = new TezOperPlan(); nig = NodeIdGenerator.getGenerator(); @@ -197,6 +212,7 @@ public class TezCompiler extends PhyPlan scope = roots.get(0).getOperatorKey().getScope(); localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig); phyToTezOpMap = Maps.newHashMap(); + inputsMap = Maps.newHashMap(); fileConcatenationThreshold = Integer.parseInt(pigProperties .getProperty(FILE_CONCATENATION_THRESHOLD, "100")); @@ -655,15 +671,8 @@ public class TezCompiler extends PhyPlan blocking(); TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp); - // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented - // with a global variable and a specific DistinctCombiner class. This seems better. - PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan; - addDistinctPlan(combinePlan, 1); - - POLocalRearrangeTez clr = localRearrangeFactory.create(); - clr.setOutputKey(curTezOp.getOperatorKey().toString()); - clr.setDistinct(true); - combinePlan.addAsLeaf(clr); + TezEdgeDescriptor edge = curTezOp.inEdges.get(lastOp.getOperatorKey()); + edge.setNeedsDistinctCombiner(true); curTezOp.markDistinct(); addDistinctPlan(curTezOp.plan, op.getRequestedParallelism()); @@ -856,6 +865,7 @@ public class TezCompiler extends PhyPlan } else { curTezOp.plan.addAsLeaf(op); } + phyToTezOpMap.put(op, curTezOp); } catch (Exception e) { int errCode = 2034; @@ -900,6 +910,7 @@ public class TezCompiler extends PhyPlan public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException { try { blocking(); + inputsMap.put(curTezOp, new ArrayList<>(Arrays.asList(compiledInputs))); TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp); curTezOp.setRequestedParallelism(op.getRequestedParallelism()); if (op.isCross()) { @@ -1088,7 +1099,7 @@ public class TezCompiler extends PhyPlan indexerTezOp.setDontEstimateParallelism(true); POStore st = TezCompilerUtil.getStore(scope, nig); - FileSpec strFile = getTempFileSpec(); + FileSpec strFile = getTempFileSpec(pigContext); st.setSFile(strFile); indexAggrOper.plan.addAsLeaf(st); indexAggrOper.setClosed(true); @@ -1255,7 +1266,7 @@ public class TezCompiler extends PhyPlan rightTezOprAggr.setDontEstimateParallelism(true); POStore st = TezCompilerUtil.getStore(scope, nig); - FileSpec strFile = getTempFileSpec(); + FileSpec strFile = getTempFileSpec(pigContext); st.setSFile(strFile); rightTezOprAggr.plan.addAsLeaf(st); rightTezOprAggr.setClosed(true); @@ -1346,6 +1357,9 @@ public class TezCompiler extends PhyPlan } else if (op.getNumInps() > 1) { curTezOp.markCogroup(); } + } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) { + curTezOp.markRegularJoin(); + addBloomToJoin(op, curTezOp); } } catch (Exception e) { int errCode = 2034; @@ -1354,6 +1368,132 @@ public class TezCompiler extends PhyPlan } } + private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws PlanException { + + List<TezOperator> inputs = inputsMap.get(curTezOp); + TezOperator buildBloomOp; + List<TezOperator> applyBloomOps = new ArrayList<>(); + + String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY); + boolean createBloomInMap = "map".equals(strategy); + if (!createBloomInMap && !strategy.equals("reduce")) { + throw new PlanException(new IllegalArgumentException( + "Invalid value for " + + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " - " + + strategy + ". Valid values are map and reduce")); + } + int numHash = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS); + int vectorSizeBytes = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES); + int numBloomFilters = POBuildBloomRearrangeTez.getNumBloomFilters(conf); + int hashType = Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE)); + + // We build bloom of the right most input and apply the bloom filter on the left inputs by default. + // But in case of left outer join we build bloom of the left input and use it on the right input + boolean[] inner = op.getPkgr().getInner(); + boolean skipNullKeys = true; + if (inner[inner.length - 1]) { // inner has from right to left while inputs has from left to right + buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is built from right most input + for (int i = 0; i < (inner.length - 1); i++) { + applyBloomOps.add(inputs.get(i)); + } + skipNullKeys = inner[0]; + } else { + // Left outer join + skipNullKeys = false; + buildBloomOp = inputs.get(0); // Bloom filter is built from left most input + for (int i = 1; i < inner.length; i++) { + applyBloomOps.add(inputs.get(i)); + } + } + + // Add BuildBloom operator to the input + POLocalRearrangeTez lr = (POLocalRearrangeTez) buildBloomOp.plan.getLeaves().get(0); + POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType); + bbr.setSkipNullKeys(skipNullKeys); + buildBloomOp.plan.remove(lr); + buildBloomOp.plan.addAsLeaf(bbr); + + // Add a new reduce vertex that will construct the final bloom filter + // - by combining the bloom filters from the buildBloomOp input tasks in the map strategy + // - or directly from the keys from the buildBloomOp input tasks in the reduce strategy + TezOperator combineBloomOp = getTezOp(); + tezPlan.add(combineBloomOp); + combineBloomOp.markBuildBloom(); + // Explicitly set the parallelism for the new vertex to number of bloom filters. + // Auto parallelism will bring it down based on the actual output size + combineBloomOp.setEstimatedParallelism(numBloomFilters); + // We don't want parallelism to be changed during the run by grace auto parallelism + // It will take the whole input size and estimate way higher + combineBloomOp.setDontEstimateParallelism(true); + + String combineBloomOpKey = combineBloomOp.getOperatorKey().toString(); + TezEdgeDescriptor edge = new TezEdgeDescriptor(); + TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge); + bbr.setBloomOutputKey(combineBloomOpKey); + + + POPackage pkg = new POPackage(OperatorKey.genOpKey(scope)); + pkg.setNumInps(1); + BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);; + pkgr.setKeyType(DataType.INTEGER); + pkg.setPkgr(pkgr); + POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope)); + combineBloomOp.plan.addAsLeaf(pkg); + combineBloomOp.plan.addAsLeaf(combineBloomOutput); + + edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName()); + edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName()); + + // Add combiner as well. + POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope)); + BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType); + combinerPkgr.setCombiner(true); + combinerPkgr.setKeyType(DataType.INTEGER); + pkg_c.setPkgr(combinerPkgr); + pkg_c.setNumInps(1); + edge.combinePlan.addAsLeaf(pkg_c); + POProject prjKey = new POProject(OperatorKey.genOpKey(scope)); + prjKey.setResultType(DataType.INTEGER); + List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>(); + PhysicalPlan pp = new PhysicalPlan(); + pp.add(prjKey); + clrInps.add(pp); + POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER); + clr.setOutputKey(combineBloomOpKey); + edge.combinePlan.addAsLeaf(clr); + + if (createBloomInMap) { + // No combiner needed on map as there will be only one bloom filter per map for each partition + // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager + edge.setCombinerInMap(false); + edge.setCombinerInReducer(true); + } else { + pkgr.setBloomKeyType(op.getPkgr().getKeyType()); + // Do distinct of the keys on the map side to reduce data sent to reducers. + // In case of reduce, not adding a combiner and doing the distinct during reduce itself. + // If needed one can be added later + edge.setCombinerInMap(true); + edge.setCombinerInReducer(false); + } + + // Broadcast the final bloom filter to other inputs + for (TezOperator applyBloomOp : applyBloomOps) { + applyBloomOp.markFilterBloom(); + lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0); + POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, numBloomFilters); + applyBloomOp.plan.remove(lr); + applyBloomOp.plan.addAsLeaf(bfr); + bfr.setInputKey(combineBloomOpKey); + edge = new TezEdgeDescriptor(); + edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName()); + edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName()); + TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST); + TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, edge); + combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString()); + } + + } + @Override public void visitPOForEach(POForEach op) throws VisitorException{ try{ @@ -1513,7 +1653,7 @@ public class TezCompiler extends PhyPlan for (int i=0; i<transformPlans.size(); i++) { eps1.add(transformPlans.get(i)); - flat1.add(true); + flat1.add(i == transformPlans.size() - 1 ? true : false); } // This foreach will pick the sort key columns from the POPoissonSample output @@ -1722,7 +1862,7 @@ public class TezCompiler extends PhyPlan * @return * @throws IOException */ - private FileSpec getTempFileSpec() throws IOException { + public static FileSpec getTempFileSpec(PigContext pigContext) throws IOException { return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(), new FuncSpec(Utils.getTmpFileCompressorName(pigContext))); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Wed Feb 22 09:43:41 2017 @@ -31,8 +31,13 @@ import org.apache.tez.runtime.library.ou * Descriptor for Tez edge. It holds combine plan as well as edge properties. */ public class TezEdgeDescriptor implements Serializable { - // Combiner runs on both input and output of Tez edge. - transient public PhysicalPlan combinePlan; + + public transient PhysicalPlan combinePlan; + private boolean needsDistinctCombiner; + // Combiner runs on both input and output of Tez edge by default + // It can be configured to run only in output(map) or input(reduce) + private Boolean combinerInMap; + private Boolean combinerInReducer; public String inputClassName; public String outputClassName; @@ -65,6 +70,30 @@ public class TezEdgeDescriptor implement dataMovementType = DataMovementType.SCATTER_GATHER; } + public boolean needsDistinctCombiner() { + return needsDistinctCombiner; + } + + public void setNeedsDistinctCombiner(boolean nic) { + needsDistinctCombiner = nic; + } + + public Boolean getCombinerInMap() { + return combinerInMap; + } + + public void setCombinerInMap(Boolean combinerInMap) { + this.combinerInMap = combinerInMap; + } + + public Boolean getCombinerInReducer() { + return combinerInReducer; + } + + public void setCombinerInReducer(Boolean combinerInReducer) { + this.combinerInReducer = combinerInReducer; + } + public boolean isUseSecondaryKey() { return useSecondaryKey; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java Wed Feb 22 09:43:41 2017 @@ -25,8 +25,9 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -217,8 +218,12 @@ public class TezOperPlan extends Operato newPlan.add(node); } - Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>(); - for (TezOperator from : mFromEdges.keySet()) { + // Using a LinkedHashSet and doing a sort so that + // test plan printed remains same between jdk7 and jdk8 + Set<Pair<TezOperator, TezOperator>> toReconnect = new LinkedHashSet<Pair<TezOperator, TezOperator>>(); + List<TezOperator> fromEdges = new ArrayList<>(mFromEdges.keySet()); + Collections.sort(fromEdges); + for (TezOperator from : fromEdges) { List<TezOperator> tos = mFromEdges.get(from); for (TezOperator to : tos) { if (list.contains(from) || list.contains(to)) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Wed Feb 22 09:43:41 2017 @@ -181,7 +181,11 @@ public class TezOperator extends Operato // Indicate if this job is a native job NATIVE, // Indicate if this job does rank counter - RANK_COUNTER; + RANK_COUNTER, + // Indicate if this job constructs bloom filter + BUILDBLOOM, + // Indicate if this job applies bloom filter + FILTERBLOOM; }; // Features in the job/vertex. Mostly will be only one feature. @@ -235,6 +239,7 @@ public class TezOperator extends Operato } private LoaderInfo loaderInfo = new LoaderInfo(); + private long totalInputFilesSize = -1; public TezOperator(OperatorKey k) { super(k); @@ -452,6 +457,22 @@ public class TezOperator extends Operato feature.set(OPER_FEATURE.RANK_COUNTER.ordinal()); } + public boolean isBuildBloom() { + return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal()); + } + + public void markBuildBloom() { + feature.set(OPER_FEATURE.BUILDBLOOM.ordinal()); + } + + public boolean isFilterBloom() { + return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal()); + } + + public void markFilterBloom() { + feature.set(OPER_FEATURE.FILTERBLOOM.ordinal()); + } + public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) { for (OPER_FEATURE opf : OPER_FEATURE.values()) { if (excludeFeatures != null && excludeFeatures.contains(opf)) { @@ -651,6 +672,14 @@ public class TezOperator extends Operato return loaderInfo; } + public long getTotalInputFilesSize() { + return totalInputFilesSize; + } + + public void setTotalInputFilesSize(long totalInputFilesSize) { + this.totalInputFilesSize = totalInputFilesSize; + } + public void setUseGraceParallelism(boolean useGraceParallelism) { this.useGraceParallelism = useGraceParallelism; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java Wed Feb 22 09:43:41 2017 @@ -31,6 +31,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -161,7 +162,7 @@ public class TezPOPackageAnnotator exten @Override public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException { POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange; - if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) { + if (!(lr.isConnectedToPackage() && lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) { return; } loRearrangeFound++; @@ -180,7 +181,9 @@ public class TezPOPackageAnnotator exten if(keyInfo == null) keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); - Integer index = Integer.valueOf(lrearrange.getIndex()); + // For BloomPackager there is only one input, but the + // POBuildBloomRearrangeTez index is that of the join's index and can be non-zero + Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : Integer.valueOf(lrearrange.getIndex()); if(keyInfo.get(index) != null) { if (isPOSplit) { // Case of POSplit having more than one input in case of self join or union @@ -197,12 +200,20 @@ public class TezPOPackageAnnotator exten } - keyInfo.put(index, - new Pair<Boolean, Map<Integer, Integer>>( - lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); - pkg.getPkgr().setKeyInfo(keyInfo); - pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); - pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); + if (pkg.getPkgr() instanceof BloomPackager ) { + keyInfo.put(index, + new Pair<Boolean, Map<Integer, Integer>>( + Boolean.FALSE, new HashMap<Integer, Integer>())); + pkg.getPkgr().setKeyInfo(keyInfo); + } else { + keyInfo.put(index, + new Pair<Boolean, Map<Integer, Integer>>( + lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); + pkg.getPkgr().setKeyInfo(keyInfo); + pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); + pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); + } + } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Wed Feb 22 09:43:41 2017 @@ -29,9 +29,14 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.OperatorPlan; import org.apache.pig.impl.plan.PlanException; @@ -160,100 +165,178 @@ public class TezPlanContainer extends Op return; } - TezOperator operToSegment = null; - List<TezOperator> succs = new ArrayList<TezOperator>(); + List<TezOperator> opersToSegment = null; try { // Split top down from root to leaves - SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan); + // Get list of operators closer to the root that can be segmented together + FirstLevelSegmentOperatorsFinder finder = new FirstLevelSegmentOperatorsFinder(tezOperPlan); finder.visit(); - operToSegment = finder.getOperatorToSegment(); + opersToSegment = finder.getOperatorsToSegment(); } catch (VisitorException e) { throw new PlanException(e); } + if (!opersToSegment.isEmpty()) { + Set<TezOperator> commonSplitterPredecessors = new HashSet<>(); + for (TezOperator operToSegment : opersToSegment) { + for (TezOperator succ : tezOperPlan.getSuccessors(operToSegment)) { + commonSplitterPredecessors + .addAll(getCommonSplitterPredecessors(tezOperPlan, + operToSegment, succ)); + } + } - if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) { - succs.addAll(tezOperPlan.getSuccessors(operToSegment)); - for (TezOperator succ : succs) { - tezOperPlan.disconnect(operToSegment, succ); - } - for (TezOperator succ : succs) { - try { - if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) { - // Has already been moved to a new plan by previous successor - // as part of dependency. It could have been further split. - // So walk the full plan to find the new plan and connect - TezOperatorFinder finder = new TezOperatorFinder(this, succ); - finder.visit(); - connect(planNode, finder.getPlanContainerNode()); - continue; + if (commonSplitterPredecessors.isEmpty()) { + List<TezOperator> allSuccs = new ArrayList<TezOperator>(); + // Disconnect all the successors and move them to a new plan + for (TezOperator operToSegment : opersToSegment) { + List<TezOperator> succs = new ArrayList<TezOperator>(); + succs.addAll(tezOperPlan.getSuccessors(operToSegment)); + allSuccs.addAll(succs); + for (TezOperator succ : succs) { + tezOperPlan.disconnect(operToSegment, succ); } - TezOperPlan newOperPlan = new TezOperPlan(); + } + TezOperPlan newOperPlan = new TezOperPlan(); + for (TezOperator succ : allSuccs) { tezOperPlan.moveTree(succ, newOperPlan); - TezPlanContainerNode newPlanNode = new TezPlanContainerNode( - generateNodeOperatorKey(), newOperPlan); - add(newPlanNode); - connect(planNode, newPlanNode); - split(newPlanNode); - if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) { - // On further split, the successor moved to a new plan container. - // Connect to that - TezOperatorFinder finder = new TezOperatorFinder(this, succ); - finder.visit(); - disconnect(planNode, newPlanNode); - connect(planNode, finder.getPlanContainerNode()); + } + TezPlanContainerNode newPlanNode = new TezPlanContainerNode( + generateNodeOperatorKey(), newOperPlan); + add(newPlanNode); + connect(planNode, newPlanNode); + split(newPlanNode); + } else { + // If there is a common splitter predecessor between operToSegment and the successor, + // we have to separate out that split to be able to segment. + // So we store the output of split to a temp store and then change the + // splittees to load from it. + String scope = opersToSegment.get(0).getOperatorKey().getScope(); + for (TezOperator splitter : commonSplitterPredecessors) { + try { + List<TezOperator> succs = new ArrayList<TezOperator>(); + succs.addAll(tezOperPlan.getSuccessors(splitter)); + FileSpec fileSpec = TezCompiler.getTempFileSpec(pigContext); + POStore tmpStore = getTmpStore(scope, fileSpec); + // Replace POValueOutputTez with POStore + splitter.plan.remove(splitter.plan.getLeaves().get(0)); + splitter.plan.addAsLeaf(tmpStore); + splitter.segmentBelow = true; + splitter.setSplitter(false); + for (TezOperator succ : succs) { + // Replace POValueInputTez with POLoad + POLoad tmpLoad = getTmpLoad(scope, fileSpec); + succ.plan.replace(succ.plan.getRoots().get(0), tmpLoad); + } + } catch (Exception e) { + throw new PlanException(e); } - } catch (VisitorException e) { - throw new PlanException(e); } } split(planNode); } } - private static class SegmentOperatorFinder extends TezOpPlanVisitor { + private static class FirstLevelSegmentOperatorsFinder extends TezOpPlanVisitor { - private TezOperator operToSegment; + private List<TezOperator> opersToSegment = new ArrayList<>(); - public SegmentOperatorFinder(TezOperPlan plan) { + public FirstLevelSegmentOperatorsFinder(TezOperPlan plan) { super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); } - public TezOperator getOperatorToSegment() { - return operToSegment; + public List<TezOperator> getOperatorsToSegment() { + return opersToSegment; } @Override - public void visitTezOp(TezOperator tezOperator) throws VisitorException { - if (tezOperator.needSegmentBelow() && operToSegment == null) { - operToSegment = tezOperator; + public void visitTezOp(TezOperator tezOp) throws VisitorException { + if (tezOp.needSegmentBelow() && getPlan().getSuccessors(tezOp) != null) { + if (opersToSegment.isEmpty()) { + opersToSegment.add(tezOp); + } else { + // If the operator does not have dependency on previous + // operators chosen for segmenting then add it to the + // operators to be segmented together + if (!hasPredecessor(tezOp, opersToSegment)) { + opersToSegment.add(tezOp); + } + } } } - } - - private static class TezOperatorFinder extends TezPlanContainerVisitor { + /** + * Check if the tezOp has one of the opsToCheck as a predecessor. + * It can be a immediate predecessor or multiple levels up. + */ + private boolean hasPredecessor(TezOperator tezOp, List<TezOperator> opsToCheck) { + List<TezOperator> predecessors = getPlan().getPredecessors(tezOp); + if (predecessors != null) { + for (TezOperator pred : predecessors) { + if (opersToSegment.contains(pred)) { + return true; + } else { + if (hasPredecessor(pred, opsToCheck)) { + return true; + } + } + } + } + return false; + } - private TezPlanContainerNode planContainerNode; - private TezOperator operatorToFind; + } - public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) { - super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan)); - this.operatorToFind = operatorToFind; + private Set<TezOperator> getCommonSplitterPredecessors(TezOperPlan plan, TezOperator operToSegment, TezOperator successor) { + Set<TezOperator> splitters1 = new HashSet<>(); + Set<TezOperator> splitters2 = new HashSet<>(); + Set<TezOperator> processedPredecessors = new HashSet<>(); + // Find predecessors which are splitters + fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1); + if (!splitters1.isEmpty()) { + // For the successor, traverse rest of the plan below it and + // search the predecessors of its successors to find any predecessor that might be a splitter. + Set<TezOperator> allSuccs = new HashSet<>(); + getAllSuccessors(plan, successor, allSuccs); + processedPredecessors.clear(); + processedPredecessors.add(successor); + for (TezOperator succ : allSuccs) { + fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2); + } + // Find the common ones + splitters1.retainAll(splitters2); } + return splitters1; + } - public TezPlanContainerNode getPlanContainerNode() { - return planContainerNode; + private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp, + Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) { + List<TezOperator> predecessors = plan.getPredecessors(tezOp); + if (predecessors != null) { + for (TezOperator pred : predecessors) { + // Skip processing already processed predecessor to avoid loops + if (processedPredecessors.contains(pred)) { + continue; + } + if (pred.isSplitter()) { + splitters.add(pred); + } else if (!pred.needSegmentBelow()) { + processedPredecessors.add(pred); + fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters); + } + } } + } - @Override - public void visitTezPlanContainerNode( - TezPlanContainerNode tezPlanContainerNode) - throws VisitorException { - if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) { - planContainerNode = tezPlanContainerNode; + private void getAllSuccessors(TezOperPlan plan, TezOperator tezOp, Set<TezOperator> allSuccs) { + List<TezOperator> successors = plan.getSuccessors(tezOp); + if (successors != null) { + for (TezOperator succ : successors) { + if (!allSuccs.contains(succ)) { + allSuccs.add(succ); + getAllSuccessors(plan, succ, allSuccs); + } } } - } private synchronized OperatorKey generateNodeOperatorKey() { @@ -267,6 +350,21 @@ public class TezPlanContainer extends Op scopeId = 0; } + private POLoad getTmpLoad(String scope, FileSpec fileSpec){ + POLoad ld = new POLoad(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope))); + ld.setPc(pigContext); + ld.setIsTmpLoad(true); + ld.setLFile(fileSpec); + return ld; + } + + private POStore getTmpStore(String scope, FileSpec fileSpec){ + POStore st = new POStore(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope))); + st.setIsTmpStore(true); + st.setSFile(fileSpec); + return new POStoreTez(st); + } + @Override public String toString() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Wed Feb 22 09:43:41 2017 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; @@ -80,6 +81,9 @@ public class TezPrinter extends TezOpPla printer.setVerbose(isVerbose); printer.visit(); mStream.println(); + } else if (edgeDesc.needsDistinctCombiner()) { + mStream.println("# Combine plan on edge <" + inEdge + ">"); + mStream.println(DistinctCombiner.Combine.class.getName()); } } } Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.Key; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager; +import org.apache.pig.builtin.BuildBloomBase; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; + +public class BloomPackager extends Packager { + + private static final long serialVersionUID = 1L; + + private boolean bloomCreatedInMap; + private int vectorSizeBytes; + private int numHash; + private int hashType; + private byte bloomKeyType; + private boolean isCombiner; + + private transient ByteArrayOutputStream baos; + private transient Iterator<Object> distinctKeyIter; + + public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes, + int numHash, int hashType) { + super(); + this.bloomCreatedInMap = bloomCreatedInMap; + this.vectorSizeBytes = vectorSizeBytes; + this.numHash = numHash; + this.hashType = hashType; + } + + public void setBloomKeyType(byte keyType) { + bloomKeyType = keyType; + } + + public void setCombiner(boolean isCombiner) { + this.isCombiner = isCombiner; + } + + @Override + public void attachInput(Object key, DataBag[] bags, boolean[] readOnce) + throws ExecException { + this.key = key; + this.bags = bags; + this.readOnce = readOnce; + // Bag can be read directly and need not be materialized again + } + + @Override + public Result getNext() throws ExecException { + try { + if (bloomCreatedInMap) { + if (bags == null) { + return new Result(POStatus.STATUS_EOP, null); + } + // Same function for combiner and reducer + return combineBloomFilters(); + } else { + if (isCombiner) { + return getDistinctBloomKeys(); + } else { + if (bags == null) { + return new Result(POStatus.STATUS_EOP, null); + } + return createBloomFilter(); + } + } + } catch (IOException e) { + throw new ExecException("Error while constructing final bloom filter", e); + } + } + + private Result combineBloomFilters() throws IOException { + // We get a bag of bloom filters. combine them into one + Iterator<Tuple> iter = bags[0].iterator(); + Tuple tup = iter.next(); + DataByteArray bloomBytes = (DataByteArray) tup.get(0); + BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes); + while (iter.hasNext()) { + tup = iter.next(); + bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0))); + } + + Object partition = key; + detachInput(); // Free up the key and bags reference + + return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length); + } + + private Result createBloomFilter() throws IOException { + // We get a bag of keys. Create a bloom filter from them + // First do distinct of the keys. Not using DistinctBag as memory should not be a problem. + HashSet<Object> bloomKeys = new HashSet<>(); + Iterator<Tuple> iter = bags[0].iterator(); + while (iter.hasNext()) { + bloomKeys.add(iter.next().get(0)); + } + + Object partition = key; + detachInput(); // Free up the key and bags reference + + BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType); + for (Object bloomKey: bloomKeys) { + Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType)); + bloomFilter.add(k); + } + bloomKeys = null; + return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64); + + } + + private Result getSerializedBloomFilter(Object partition, + BloomFilter bloomFilter, int serializedSize) throws ExecException, + IOException { + if (baos == null) { + baos = new ByteArrayOutputStream(serializedSize); + } + baos.reset(); + DataOutputStream dos = new DataOutputStream(baos); + bloomFilter.write(dos); + dos.flush(); + + Tuple res = mTupleFactory.newTuple(2); + res.set(0, partition); + res.set(1, new DataByteArray(baos.toByteArray())); + + Result r = new Result(); + r.result = res; + r.returnStatus = POStatus.STATUS_OK; + return r; + } + + private Result getDistinctBloomKeys() throws ExecException { + if (distinctKeyIter == null) { + HashSet<Object> bloomKeys = new HashSet<>(); + Iterator<Tuple> iter = bags[0].iterator(); + while (iter.hasNext()) { + bloomKeys.add(iter.next().get(0)); + } + distinctKeyIter = bloomKeys.iterator(); + } + while (distinctKeyIter.hasNext()) { + Tuple res = mTupleFactory.newTuple(2); + res.set(0, key); + res.set(1, distinctKeyIter.next()); + + Result r = new Result(); + r.result = res; + r.returnStatus = POStatus.STATUS_OK; + return r; + } + distinctKeyIter = null; + return new Result(POStatus.STATUS_EOP, null); + } +} Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.Key; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.HDataType; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; +import org.apache.pig.builtin.BuildBloomBase; +import org.apache.pig.classification.InterfaceAudience; +import org.apache.pig.classification.InterfaceStability; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValueReader; + [email protected] [email protected] +public class POBloomFilterRearrangeTez extends POLocalRearrangeTez implements TezInput { + private static final long serialVersionUID = 1L; + + private static final Log LOG = LogFactory.getLog(POBloomFilterRearrangeTez.class); + private String inputKey; + private transient KeyValueReader reader; + private transient String cacheKey; + private int numBloomFilters; + private transient BloomFilter[] bloomFilters; + + public POBloomFilterRearrangeTez(POLocalRearrangeTez lr, int numBloomFilters) { + super(lr); + this.numBloomFilters = numBloomFilters; + } + + public void setInputKey(String inputKey) { + this.inputKey = inputKey; + } + + @Override + public String[] getTezInputs() { + return new String[] { inputKey }; + } + + @Override + public void replaceInput(String oldInputKey, String newInputKey) { + if (oldInputKey.equals(inputKey)) { + inputKey = newInputKey; + } + } + + @Override + public void addInputsToSkip(Set<String> inputsToSkip) { + cacheKey = "bloom-" + inputKey; + Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); + if (cacheValue != null) { + inputsToSkip.add(inputKey); + } + } + + @Override + public void attachInputs(Map<String, LogicalInput> inputs, + Configuration conf) throws ExecException { + Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); + if (cacheValue != null) { + bloomFilters = (BloomFilter[]) cacheValue; + return; + } + LogicalInput input = inputs.get(inputKey); + if (input == null) { + throw new ExecException("Input from vertex " + inputKey + " is missing"); + } + try { + reader = (KeyValueReader) input.getReader(); + LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader); + while (reader.next()) { + if (bloomFilters == null) { + bloomFilters = new BloomFilter[numBloomFilters]; + } + Tuple val = (Tuple) reader.getCurrentValue(); + int index = (int) val.get(0); + bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1)); + } + ObjectCache.getInstance().cache(cacheKey, bloomFilters); + } catch (Exception e) { + throw new ExecException(e); + } + } + + @Override + public Result getNextTuple() throws ExecException { + + // If there is no bloom filter, then it means right input was empty + // Skip processing + if (bloomFilters == null) { + return RESULT_EOP; + } + + while (true) { + res = super.getRearrangedTuple(); + try { + switch (res.returnStatus) { + case POStatus.STATUS_OK: + if (illustrator == null) { + Tuple result = (Tuple) res.result; + Byte index = (Byte) result.get(0); + + // Skip the record if key is not in the bloom filter + if (!isKeyInBloomFilter(result.get(1))) { + continue; + } + PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType); + NullableTuple val = new NullableTuple((Tuple)result.get(2)); + key.setIndex(index); + val.setIndex(index); + writer.write(key, val); + } else { + illustratorMarkup(res.result, res.result, 0); + } + continue; + case POStatus.STATUS_NULL: + continue; + case POStatus.STATUS_EOP: + case POStatus.STATUS_ERR: + default: + return res; + } + } catch (IOException ioe) { + int errCode = 2135; + String msg = "Received error from POBloomFilterRearrage function." + ioe.getMessage(); + throw new ExecException(msg, errCode, ioe); + } + } + } + + private boolean isKeyInBloomFilter(Object key) throws ExecException { + if (key == null) { + // Null values are dropped in a inner join and in the case of outer join, + // POBloomFilterRearrangeTez is only in the plan on the non outer relation. + // So just skip them + return false; + } + if (bloomFilters.length == 1) { + // Skip computing hashcode + Key k = new Key(DataType.toBytes(key, keyType)); + return bloomFilters[0].membershipTest(k); + } else { + int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters; + BloomFilter filter = bloomFilters[partition]; + if (filter != null) { + Key k = new Key(DataType.toBytes(key, keyType)); + return filter.membershipTest(k); + } + return false; + } + } + + @Override + public POBloomFilterRearrangeTez clone() throws CloneNotSupportedException { + return (POBloomFilterRearrangeTez) super.clone(); + } + + @Override + public String name() { + return getAliasString() + "BloomFilter Rearrange" + "[" + + DataType.findTypeName(resultType) + "]" + "{" + + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct + + ") - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey; + } + +} Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.Key; +import org.apache.pig.PigConfiguration; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.HDataType; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.classification.InterfaceAudience; +import org.apache.pig.classification.InterfaceStability; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableIntWritable; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.library.api.KeyValueWriter; + +/** + * This operator writes out the key value for the hash join reduce operation similar to POLocalRearrangeTez. + * In addition, it also writes out the bloom filter constructed from the join keys + * in the case of bloomjoin map strategy or join keys themselves in case of reduce strategy. + * + * Using multiple bloom filters partitioned by the hash of the key allows for parallelism. + * It also allows us to have lower false positives with smaller vector sizes. + * + */ [email protected] [email protected] +public class POBuildBloomRearrangeTez extends POLocalRearrangeTez { + private static final long serialVersionUID = 1L; + private static final Log LOG = LogFactory.getLog(POBuildBloomRearrangeTez.class); + + public static final String DEFAULT_BLOOM_STRATEGY = "map"; + public static final int DEFAULT_NUM_BLOOM_FILTERS_REDUCE = 11; + public static final int DEFAULT_NUM_BLOOM_HASH_FUNCTIONS = 3; + public static final String DEFAULT_BLOOM_HASH_TYPE = "murmur"; + public static final int DEFAULT_BLOOM_VECTOR_SIZE_BYTES = 1024 * 1024; + + private String bloomOutputKey; + private boolean skipNullKeys = false; + private boolean createBloomInMap; + private int numBloomFilters; + private int vectorSizeBytes; + private int numHash; + private int hashType; + + private transient BloomFilter[] bloomFilters; + private transient KeyValueWriter bloomWriter; + private transient PigNullableWritable nullKey; + private transient Tuple bloomValue; + private transient NullableTuple bloomNullableTuple; + + public POBuildBloomRearrangeTez(POLocalRearrangeTez lr, + boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes, + int numHash, int hashType) { + super(lr); + this.createBloomInMap = createBloomInMap; + this.numBloomFilters = numBloomFilters; + this.vectorSizeBytes = vectorSizeBytes; + this.numHash = numHash; + this.hashType = hashType; + } + + public static int getNumBloomFilters(Configuration conf) { + if ("map".equals(conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, DEFAULT_BLOOM_STRATEGY))) { + return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, 1); + } else { + return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, DEFAULT_NUM_BLOOM_FILTERS_REDUCE); + } + } + + public void setSkipNullKeys(boolean skipNullKeys) { + this.skipNullKeys = skipNullKeys; + } + + public void setBloomOutputKey(String bloomOutputKey) { + this.bloomOutputKey = bloomOutputKey; + } + + @Override + public boolean containsOutputKey(String key) { + if(super.containsOutputKey(key)) { + return true; + } + return bloomOutputKey.equals(key); + } + + @Override + public String[] getTezOutputs() { + return new String[] { outputKey, bloomOutputKey }; + } + + @Override + public void replaceOutput(String oldOutputKey, String newOutputKey) { + if (oldOutputKey.equals(outputKey)) { + outputKey = newOutputKey; + } else if (oldOutputKey.equals(bloomOutputKey)) { + bloomOutputKey = newOutputKey; + } + } + + @Override + public void attachOutputs(Map<String, LogicalOutput> outputs, + Configuration conf) throws ExecException { + super.attachOutputs(outputs, conf); + LogicalOutput output = outputs.get(bloomOutputKey); + if (output == null) { + throw new ExecException("Output to vertex " + bloomOutputKey + " is missing"); + } + try { + bloomWriter = (KeyValueWriter) output.getWriter(); + LOG.info("Attached output to vertex " + bloomOutputKey + " : output=" + output + ", writer=" + bloomWriter); + } catch (Exception e) { + throw new ExecException(e); + } + bloomFilters = new BloomFilter[numBloomFilters]; + bloomValue = mTupleFactory.newTuple(1); + bloomNullableTuple = new NullableTuple(bloomValue); + } + + @Override + public Result getNextTuple() throws ExecException { + + PigNullableWritable key; + while (true) { + res = super.getRearrangedTuple(); + try { + switch (res.returnStatus) { + case POStatus.STATUS_OK: + if (illustrator == null) { + Tuple result = (Tuple) res.result; + Byte index = (Byte) result.get(0); + + Object keyObj = result.get(1); + if (keyObj != null) { + key = HDataType.getWritableComparableTypes(keyObj, keyType); + // null keys cannot be part of bloom filter + // Since they are also dropped during join we can skip them + if (createBloomInMap) { + addKeyToBloomFilter(keyObj); + } else { + writeJoinKeyForBloom(keyObj); + } + } else if (skipNullKeys) { + // Inner join. So don't bother writing null key + continue; + } else { + if (nullKey == null) { + nullKey = HDataType.getWritableComparableTypes(keyObj, keyType); + } + key = nullKey; + } + + NullableTuple val = new NullableTuple((Tuple)result.get(2)); + key.setIndex(index); + val.setIndex(index); + writer.write(key, val); + } else { + illustratorMarkup(res.result, res.result, 0); + } + continue; + case POStatus.STATUS_NULL: + continue; + case POStatus.STATUS_EOP: + if (this.parentPlan.endOfAllInput && createBloomInMap) { + // In case of Split will get EOP after every record. + // So check for endOfAllInput + writeBloomFilters(); + } + case POStatus.STATUS_ERR: + default: + return res; + } + } catch (IOException ioe) { + int errCode = 2135; + String msg = "Received error from POBuildBloomRearrage function." + ioe.getMessage(); + throw new ExecException(msg, errCode, ioe); + } + } + } + + private void addKeyToBloomFilter(Object key) throws ExecException { + Key k = new Key(DataType.toBytes(key, keyType)); + if (bloomFilters.length == 1) { + if (bloomFilters[0] == null) { + bloomFilters[0] = new BloomFilter(vectorSizeBytes * 8, numHash, hashType); + } + bloomFilters[0].add(k); + } else { + int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters; + BloomFilter filter = bloomFilters[partition]; + if (filter == null) { + filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType); + bloomFilters[partition] = filter; + } + filter.add(k); + } + } + + private void writeJoinKeyForBloom(Object key) throws IOException { + int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters; + bloomValue.set(0, key); + bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple); + } + + private void writeBloomFilters() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64); + for (int i = 0; i < bloomFilters.length; i++) { + if (bloomFilters[i] != null) { + DataOutputStream dos = new DataOutputStream(baos); + bloomFilters[i].write(dos); + dos.flush(); + bloomValue.set(0, new DataByteArray(baos.toByteArray())); + bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple); + baos.reset(); + } + } + } + + @Override + public POBuildBloomRearrangeTez clone() throws CloneNotSupportedException { + return (POBuildBloomRearrangeTez) super.clone(); + } + + @Override + public String name() { + return getAliasString() + "BuildBloom Rearrange" + "[" + + DataType.findTypeName(resultType) + "]" + "{" + + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct + + ") - " + mKey.toString() + "\t->\t[ " + outputKey + ", " + bloomOutputKey +"]"; + } + +}
