No explicit caching when hash table is cached Add tests for explicit cache removal when hash table is cached
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/52f9cf63 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/52f9cf63 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/52f9cf63 Branch: refs/heads/master Commit: 52f9cf63e66a53592ea952c154e9e28ab4bce3b7 Parents: a9b1daa Author: Stephan Ewen <[email protected]> Authored: Wed Jul 9 03:10:45 2014 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Jul 9 03:19:24 2014 +0200 ---------------------------------------------------------------------- .../eu/stratosphere/compiler/dag/TempMode.java | 11 ++++++++ .../operators/HashJoinBuildFirstProperties.java | 6 +++++ .../HashJoinBuildSecondProperties.java | 9 ++++++- .../CachedMatchStrategyCompilerTest.java | 28 +++++++++++++------- 4 files changed, 44 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java index 7907af7..a698422 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java @@ -63,4 +63,15 @@ public enum TempMode { return this; } } + + + public TempMode makeNonCached() { + if (this == CACHED) { + return NONE; + } else if (this == CACHING_PIPELINE_BREAKER) { + return PIPELINE_BREAKER; + } else { + return this; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java index 4f694c5..f8a7c38 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java @@ -17,6 +17,7 @@ import java.util.Collections; import java.util.List; import eu.stratosphere.api.common.operators.util.FieldList; +import eu.stratosphere.compiler.CompilerException; import eu.stratosphere.compiler.dag.TwoInputNode; import eu.stratosphere.compiler.dataproperties.LocalProperties; import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties; @@ -57,6 +58,11 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor { DriverStrategy strategy; if(!in1.isOnDynamicPath() && in2.isOnDynamicPath()) { + // sanity check that the first input is cached and remove that cache + if (!in1.getTempMode().isCached()) { + throw new CompilerException("No cache at point where static and dynamic parts meet."); + } + in1.setTempMode(in1.getTempMode().makeNonCached()); strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED; } else { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java index 6bea65a..6982ce6 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java @@ -17,6 +17,7 @@ import java.util.Collections; import java.util.List; import eu.stratosphere.api.common.operators.util.FieldList; +import eu.stratosphere.compiler.CompilerException; import eu.stratosphere.compiler.dag.TwoInputNode; import eu.stratosphere.compiler.dataproperties.LocalProperties; import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties; @@ -53,7 +54,13 @@ public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { DriverStrategy strategy; - if(!in2.isOnDynamicPath() && in1.isOnDynamicPath()) { + if (!in2.isOnDynamicPath() && in1.isOnDynamicPath()) { + // sanity check that the first input is cached and remove that cache + if (!in2.getTempMode().isCached()) { + throw new CompilerException("No cache at point where static and dynamic parts meet."); + } + + in2.setTempMode(in2.getTempMode().makeNonCached()); strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED; } else { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java index b82c0f9..33fc6e4 100644 --- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java +++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java @@ -28,6 +28,7 @@ import eu.stratosphere.api.java.IterativeDataSet; import eu.stratosphere.api.java.functions.JoinFunction; import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.compiler.PactCompiler; +import eu.stratosphere.compiler.dag.TempMode; import eu.stratosphere.compiler.plan.DualInputPlanNode; import eu.stratosphere.compiler.plan.OptimizedPlan; import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator; @@ -57,6 +58,8 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { // verify correct join strategy assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy()); + assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); + assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); new NepheleJobGraphGenerator().compileJobGraph(oPlan); } @@ -83,6 +86,8 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { // verify correct join strategy assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, innerJoin.getDriverStrategy()); + assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); + assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode()); new NepheleJobGraphGenerator().compileJobGraph(oPlan); } @@ -109,7 +114,9 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner"); // verify correct join strategy - assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy()); + assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy()); + assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); + assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); new NepheleJobGraphGenerator().compileJobGraph(oPlan); } @@ -135,7 +142,9 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner"); // verify correct join strategy - assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy()); + assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy()); + assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode()); + assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); new NepheleJobGraphGenerator().compileJobGraph(oPlan); } @@ -169,13 +178,16 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { } } + OptimizedPlan oPlan = compileNoStats(plan); OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan); DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner"); // verify correct join strategy - assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy()); + assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy()); + assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode()); + assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode()); new NepheleJobGraphGenerator().compileJobGraph(oPlan); } @@ -197,16 +209,14 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase { IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10); - DataSet<Tuple3<Long, Long, Long>> inner; + Configuration joinStrategy = new Configuration(); + joinStrategy.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH); if(strategy != "") { - Configuration joinStrategy = new Configuration(); joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy); - inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy); - } - else { - inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner"); } + + DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy); DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
