Repository: flink
Updated Branches:
  refs/heads/master f429b4cde -> 92026ff39


[FLINK-8475][config][docs] Integrate Algorithm options

This closes #5460.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/023ab749
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/023ab749
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/023ab749

Branch: refs/heads/master
Commit: 023ab749cc2d82fcf729d3e3f8052c60280f0af3
Parents: f429b4c
Author: zentol <ches...@apache.org>
Authored: Tue Jan 30 14:06:30 2018 +0100
Committer: zentol <ches...@apache.org>
Committed: Mon Feb 12 17:03:02 2018 +0100

----------------------------------------------------------------------
 .../generated/algorithm_configuration.html      | 26 +++++++++++
 docs/ops/config.md                              |  6 +--
 .../flink/configuration/AlgorithmOptions.java   | 47 ++++++++++++++++++++
 .../plantranslate/JobGraphGenerator.java        | 11 +++--
 .../AbstractCachedBuildSideJoinDriver.java      |  7 ++-
 .../flink/runtime/operators/JoinDriver.java     |  7 ++-
 6 files changed, 85 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/docs/_includes/generated/algorithm_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/algorithm_configuration.html 
b/docs/_includes/generated/algorithm_configuration.html
new file mode 100644
index 0000000..c1406ee
--- /dev/null
+++ b/docs/_includes/generated/algorithm_configuration.html
@@ -0,0 +1,26 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>taskmanager.runtime.hashjoin-bloom-filters</h5></td>
+            <td>false</td>
+            <td>Flag to activate/deactivate bloom filters in the hybrid hash 
join implementation. In cases where the hash join needs to spill to disk 
(datasets larger than the reserved fraction of memory), these bloom filters can 
greatly reduce the number of spilled records, at the cost some CPU cycles.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.runtime.max-fan</h5></td>
+            <td>128</td>
+            <td>The maximal fan-in for external merge joins and fan-out for 
spilling hash tables. Limits the number of file handles per operator, but may 
cause intermediate merging/partitioning, if set too small.</td>
+        </tr>
+        <tr>
+            <td><h5>taskmanager.runtime.sort-spilling-threshold</h5></td>
+            <td>0.8</td>
+            <td>A sort operation starts spilling when this fraction of its 
memory budget is full.</td>
+        </tr>
+    </tbody>
+</table>

http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 41f0349..3236199 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -393,11 +393,7 @@ definition. This scheme is used **ONLY** if no other 
scheme is specified (explic
 
 ### Runtime Algorithms
 
-- `taskmanager.runtime.hashjoin-bloom-filters`: Flag to activate/deactivate 
bloom filters in the hybrid hash join implementation. In cases where the hash 
join needs to spill to disk (datasets larger than the reserved fraction of 
memory), these bloom filters can greatly reduce the number of spilled records, 
at the cost some CPU cycles. (DEFAULT: false)
-
-- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins 
and fan-out for spilling hash tables. Limits the number of file handles per 
operator, but may cause intermediate merging/partitioning, if set too small 
(DEFAULT: 128).
-
-- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts 
spilling when this fraction of its memory budget is full (DEFAULT: 0.8).
+{% include generated/algorithm_configuration.html %}
 
 ### Resource Manager
 

http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/flink-core/src/main/java/org/apache/flink/configuration/AlgorithmOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AlgorithmOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AlgorithmOptions.java
new file mode 100644
index 0000000..f44432c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/AlgorithmOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration parameters for join/sort algorithms.
+ */
+public class AlgorithmOptions {
+
+       public static final ConfigOption<Boolean> HASH_JOIN_BLOOM_FILTERS =
+               key("taskmanager.runtime.hashjoin-bloom-filters")
+                       .defaultValue(false)
+                       .withDescription("Flag to activate/deactivate bloom 
filters in the hybrid hash join implementation." +
+                               " In cases where the hash join needs to spill 
to disk (datasets larger than the reserved fraction of" +
+                               " memory), these bloom filters can greatly 
reduce the number of spilled records, at the cost some" +
+                               " CPU cycles.");
+
+       public static final ConfigOption<Integer> SPILLING_MAX_FAN =
+               key("taskmanager.runtime.max-fan")
+                       .defaultValue(128)
+                       .withDescription("The maximal fan-in for external merge 
joins and fan-out for spilling hash tables. Limits" +
+                               " the number of file handles per operator, but 
may cause intermediate merging/partitioning, if set too" +
+                               " small.");
+
+       public static final ConfigOption<Float> SORT_SPILLING_THRESHOLD =
+               key("taskmanager.runtime.sort-spilling-threshold")
+                       .defaultValue(0.8f)
+                       .withDescription("A sort operation starts spilling when 
this fraction of its memory budget is full.");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 248049e..ae20567 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.AlgorithmOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.dag.TempMode;
@@ -145,16 +146,14 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
         * Creates a new job graph generator that uses the default values for 
its resource configuration.
         */
        public JobGraphGenerator() {
-               this.defaultMaxFan = ConfigConstants.DEFAULT_SPILLING_MAX_FAN;
-               this.defaultSortSpillingThreshold = 
ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD;
+               this.defaultMaxFan = 
AlgorithmOptions.SPILLING_MAX_FAN.defaultValue();
+               this.defaultSortSpillingThreshold = 
AlgorithmOptions.SORT_SPILLING_THRESHOLD.defaultValue();
                this.useLargeRecordHandler = 
ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER;
        }
        
        public JobGraphGenerator(Configuration config) {
-               this.defaultMaxFan = 
config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY, 
-                       ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
-               this.defaultSortSpillingThreshold = 
config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
-                       ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
+               this.defaultMaxFan = 
config.getInteger(AlgorithmOptions.SPILLING_MAX_FAN);
+               this.defaultSortSpillingThreshold = 
config.getFloat(AlgorithmOptions.SORT_SPILLING_THRESHOLD);
                this.useLargeRecordHandler = config.getBoolean(
                                ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY,
                                
ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER);

http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index 712018b..ff8351c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AlgorithmOptions;
 import org.apache.flink.metrics.Counter;
 import 
org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashJoinIterator;
 import 
org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashJoinIterator;
@@ -80,9 +80,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, 
IT2, OT> extends Jo
                                
this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader());
 
                double availableMemory = config.getRelativeMemoryDriver();
-               boolean hashJoinUseBitMaps = 
taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
-                               
ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
-                               
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
+               boolean hashJoinUseBitMaps = 
taskContext.getTaskManagerInfo().getConfiguration()
+                       .getBoolean(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS);
                
                ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
                objectReuseEnabled = executionConfig.isObjectReuseEnabled();

http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
index b8cb545..585dc0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.AlgorithmOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -123,9 +123,8 @@ public class JoinDriver<IT1, IT2, OT> implements 
Driver<FlatJoinFunction<IT1, IT
                        LOG.debug("Join Driver object reuse: " + 
(objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
                }
                
-               boolean hashJoinUseBitMaps = 
taskContext.getTaskManagerInfo().getConfiguration().getBoolean(
-                               
ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY,
-                               
ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS);
+               boolean hashJoinUseBitMaps = 
taskContext.getTaskManagerInfo().getConfiguration()
+                       .getBoolean(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS);
 
                // create and return joining iterator according to provided 
local strategy.
                if (objectReuseEnabled) {

Reply via email to