No explicit caching when hash table is cached
Add tests for explicit cache removal when hash table is cached


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/52f9cf63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/52f9cf63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/52f9cf63

Branch: refs/heads/master
Commit: 52f9cf63e66a53592ea952c154e9e28ab4bce3b7
Parents: a9b1daa
Author: Stephan Ewen <[email protected]>
Authored: Wed Jul 9 03:10:45 2014 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Jul 9 03:19:24 2014 +0200

----------------------------------------------------------------------
 .../eu/stratosphere/compiler/dag/TempMode.java  | 11 ++++++++
 .../operators/HashJoinBuildFirstProperties.java |  6 +++++
 .../HashJoinBuildSecondProperties.java          |  9 ++++++-
 .../CachedMatchStrategyCompilerTest.java        | 28 +++++++++++++-------
 4 files changed, 44 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java
----------------------------------------------------------------------
diff --git 
a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java
 
b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java
index 7907af7..a698422 100644
--- 
a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java
+++ 
b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java
@@ -63,4 +63,15 @@ public enum TempMode {
                        return this;
                }
        }
+       
+       
+       public TempMode makeNonCached() {
+               if (this == CACHED) {
+                       return NONE;
+               } else if (this == CACHING_PIPELINE_BREAKER) {
+                       return PIPELINE_BREAKER;
+               } else {
+                       return this;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java
----------------------------------------------------------------------
diff --git 
a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java
 
b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java
index 4f694c5..f8a7c38 100644
--- 
a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java
+++ 
b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java
@@ -17,6 +17,7 @@ import java.util.Collections;
 import java.util.List;
 
 import eu.stratosphere.api.common.operators.util.FieldList;
+import eu.stratosphere.compiler.CompilerException;
 import eu.stratosphere.compiler.dag.TwoInputNode;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
 import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
@@ -57,6 +58,11 @@ public class HashJoinBuildFirstProperties extends 
AbstractJoinDescriptor {
                DriverStrategy strategy;
                
                if(!in1.isOnDynamicPath() && in2.isOnDynamicPath()) {
+                       // sanity check that the first input is cached and 
remove that cache
+                       if (!in1.getTempMode().isCached()) {
+                               throw new CompilerException("No cache at point 
where static and dynamic parts meet.");
+                       }
+                       in1.setTempMode(in1.getTempMode().makeNonCached());
                        strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED;
                }
                else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java
----------------------------------------------------------------------
diff --git 
a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java
 
b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java
index 6bea65a..6982ce6 100644
--- 
a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java
+++ 
b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java
@@ -17,6 +17,7 @@ import java.util.Collections;
 import java.util.List;
 
 import eu.stratosphere.api.common.operators.util.FieldList;
+import eu.stratosphere.compiler.CompilerException;
 import eu.stratosphere.compiler.dag.TwoInputNode;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
 import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
@@ -53,7 +54,13 @@ public final class HashJoinBuildSecondProperties extends 
AbstractJoinDescriptor
        public DualInputPlanNode instantiate(Channel in1, Channel in2, 
TwoInputNode node) {
                DriverStrategy strategy;
                
-               if(!in2.isOnDynamicPath() && in1.isOnDynamicPath()) {
+               if (!in2.isOnDynamicPath() && in1.isOnDynamicPath()) {
+                       // sanity check that the first input is cached and 
remove that cache
+                       if (!in2.getTempMode().isCached()) {
+                               throw new CompilerException("No cache at point 
where static and dynamic parts meet.");
+                       }
+                       
+                       in2.setTempMode(in2.getTempMode().makeNonCached());
                        strategy = 
DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED;
                }
                else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java
 
b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java
index b82c0f9..33fc6e4 100644
--- 
a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java
+++ 
b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java
@@ -28,6 +28,7 @@ import eu.stratosphere.api.java.IterativeDataSet;
 import eu.stratosphere.api.java.functions.JoinFunction;
 import eu.stratosphere.api.java.tuple.Tuple3;
 import eu.stratosphere.compiler.PactCompiler;
+import eu.stratosphere.compiler.dag.TempMode;
 import eu.stratosphere.compiler.plan.DualInputPlanNode;
 import eu.stratosphere.compiler.plan.OptimizedPlan;
 import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
@@ -57,6 +58,8 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                        
                        // verify correct join strategy
                        
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, 
innerJoin.getDriverStrategy()); 
+                       assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
+                       assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
                
                        new NepheleJobGraphGenerator().compileJobGraph(oPlan);
                }
@@ -83,6 +86,8 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                        
                        // verify correct join strategy
                        assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, 
innerJoin.getDriverStrategy()); 
+                       assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
+                       assertEquals(TempMode.CACHED, 
innerJoin.getInput2().getTempMode());
                
                        new NepheleJobGraphGenerator().compileJobGraph(oPlan);
                }
@@ -109,7 +114,9 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                        DualInputPlanNode innerJoin = 
resolver.getNode("DummyJoiner");
                        
                        // verify correct join strategy
-                       
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, 
innerJoin.getDriverStrategy()); 
+                       
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, 
innerJoin.getDriverStrategy());
+                       assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
+                       assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
                
                        new NepheleJobGraphGenerator().compileJobGraph(oPlan);
                }
@@ -135,7 +142,9 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                        DualInputPlanNode innerJoin = 
resolver.getNode("DummyJoiner");
                        
                        // verify correct join strategy
-                       assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, 
innerJoin.getDriverStrategy()); 
+                       assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, 
innerJoin.getDriverStrategy());
+                       assertEquals(TempMode.CACHED, 
innerJoin.getInput1().getTempMode());
+                       assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
                
                        new NepheleJobGraphGenerator().compileJobGraph(oPlan);
                }
@@ -169,13 +178,16 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                                }
                        }
                        
+                       
                        OptimizedPlan oPlan = compileNoStats(plan);
        
                        OptimizerPlanNodeResolver resolver = 
getOptimizerPlanNodeResolver(oPlan);
                        DualInputPlanNode innerJoin = 
resolver.getNode("DummyJoiner");
                        
                        // verify correct join strategy
-                       
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, 
innerJoin.getDriverStrategy()); 
+                       
assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, 
innerJoin.getDriverStrategy());
+                       assertEquals(TempMode.NONE, 
innerJoin.getInput1().getTempMode());
+                       assertEquals(TempMode.NONE, 
innerJoin.getInput2().getTempMode());
                
                        new NepheleJobGraphGenerator().compileJobGraph(oPlan);
                }
@@ -197,16 +209,14 @@ public class CachedMatchStrategyCompilerTest extends 
CompilerTestBase {
                
                IterativeDataSet<Tuple3<Long, Long, Long>> iteration = 
bigInput.iterate(10);
                
-               DataSet<Tuple3<Long, Long, Long>> inner;
+               Configuration joinStrategy = new Configuration();
+               joinStrategy.setString(PactCompiler.HINT_SHIP_STRATEGY, 
PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
                
                if(strategy != "") {
-                       Configuration joinStrategy = new Configuration();
                        
joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy);
-                       inner = 
iteration.join(smallInput).where(0).equalTo(0).with(new 
DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
-               }
-               else {
-                       inner = 
iteration.join(smallInput).where(0).equalTo(0).with(new 
DummyJoiner()).name("DummyJoiner");
                }
+               
+               DataSet<Tuple3<Long, Long, Long>> inner = 
iteration.join(smallInput).where(0).equalTo(0).with(new 
DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
 
                DataSet<Tuple3<Long, Long, Long>> output = 
iteration.closeWith(inner);
                

Reply via email to